Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Refactor PeersService to more idiomatic async/.await (#27) #110

Merged
merged 5 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/media/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl Peer<New> {
&mut self,
src: &WebRtcPublishEndpoint,
publisher_peer: &mut Peer<New>,
tracks_counter: &mut Counter<TrackId>,
tracks_counter: &Counter<TrackId>,
) {
let audio_settings = src.audio_settings();
if audio_settings.publish_policy != PublishPolicy::Disabled {
Expand Down Expand Up @@ -528,7 +528,7 @@ pub mod tests {
},
};

let mut track_id_counter = Counter::default();
let track_id_counter = Counter::default();

for _ in 0..send_audio {
let track_id = track_id_counter.next_id();
Expand Down
11 changes: 1 addition & 10 deletions src/signalling/elements/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,14 @@ use crate::signalling::elements::endpoints::webrtc::{
///
/// [Medea]: https://github.com/instrumentisto/medea
#[enum_delegate(pub fn is_force_relayed(&self) -> bool)]
#[enum_delegate(pub fn has_traffic_callback(&self) -> bool)]
#[derive(Clone, Debug, From)]
pub enum Endpoint {
WebRtcPublishEndpoint(webrtc::WebRtcPublishEndpoint),
WebRtcPlayEndpoint(webrtc::WebRtcPlayEndpoint),
}

impl Endpoint {
/// Returns `true` if `on_start` or `on_stop` callback is set.
#[allow(clippy::unused_self)]
#[inline]
pub fn has_traffic_callback(&self) -> bool {
// TODO: Delegate this call to
// `WebRtcPublishEndpoint`/`WebRtcPlayEndpoint`.

false
}

/// Returns [`Weak`] reference to this [`Endpoint`].
pub fn downgrade(&self) -> WeakEndpoint {
match self {
Expand Down
8 changes: 8 additions & 0 deletions src/signalling/elements/endpoints/webrtc/play_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ impl WebRtcPlayEndpoint {
self.0.borrow().is_force_relayed
}

/// Returns `true` if `on_start` or `on_stop` callback is set.
#[allow(clippy::unused_self)]
pub fn has_traffic_callback(&self) -> bool {
// TODO: Must depend on on_start/on_stop endpoint callbacks, when those
// will be added (#91).
true
}

/// Downgrades [`WebRtcPlayEndpoint`] to [`WeakWebRtcPlayEndpoint`] weak
/// pointer.
pub fn downgrade(&self) -> WeakWebRtcPlayEndpoint {
Expand Down
8 changes: 8 additions & 0 deletions src/signalling/elements/endpoints/webrtc/publish_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ impl WebRtcPublishEndpoint {
self.0.borrow().is_force_relayed
}

/// Returns `true` if `on_start` or `on_stop` callback is set.
#[allow(clippy::unused_self)]
pub fn has_traffic_callback(&self) -> bool {
// TODO: Must depend on on_start/on_stop endpoint callbacks, when those
// will be added (#91).
true
}

/// Returns [`AudioSettings`] of this [`WebRtcPublishEndpoint`].
pub fn audio_settings(&self) -> AudioSettings {
self.0.borrow().audio_settings
Expand Down
34 changes: 29 additions & 5 deletions src/signalling/peers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl PeersMetricsService {
///
/// Sends [`PeersMetricsEvent::NoTrafficFlow`] message if it determines that
/// some track is not flowing.
pub fn check_peers(&mut self) {
pub fn check_peers(&self) {
for peer in self
.peers
.values()
Expand Down Expand Up @@ -202,7 +202,7 @@ impl PeersMetricsService {
/// [`PeersMetricsEvent::WrongTrafficFlowing`] or [`PeersMetricsEvent::
/// TrackTrafficStarted`] to the [`Room`] if some
/// [`MediaType`]/[`Direction`] was stopped.
pub fn add_stats(&mut self, peer_id: PeerId, stats: Vec<RtcStat>) {
pub fn add_stats(&self, peer_id: PeerId, stats: Vec<RtcStat>) {
if let Some(peer) = self.peers.get(&peer_id) {
let mut peer_ref = peer.borrow_mut();

Expand Down Expand Up @@ -409,7 +409,7 @@ pub enum PeersMetricsEvent {
///
/// This spec is compared with [`Peer`]s actual stats, to calculate difference
/// between expected and actual [`Peer`] state.
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
struct PeerTracks {
/// Count of the [`MediaTrack`]s with the [`Direction::Publish`] and
/// [`MediaType::Audio`].
Expand Down Expand Up @@ -860,6 +860,30 @@ mod tests {

use super::PeersMetricsService;

impl PeersMetricsService {
/// Returns `true` if `Peer` with a provided [`PeerId`] isn't registered
/// in the [`PeersMetricsService`].
pub fn is_peer_registered(&self, peer_id: PeerId) -> bool {
self.peers.contains_key(&peer_id)
}

/// Returns count of the `MediaTrack` which are registerd in the
/// [`PeersMetricsService`].
pub fn peer_tracks_count(&self, peer_id: PeerId) -> usize {
if let Some(peer) = self.peers.get(&peer_id) {
let peer_tracks = peer.borrow().tracks_spec;
let mut tracks_count = 0;
tracks_count += peer_tracks.audio_recv;
tracks_count += peer_tracks.video_recv;
tracks_count += peer_tracks.audio_send;
tracks_count += peer_tracks.video_send;
tracks_count
} else {
0
}
}
}

/// Returns [`RtcOutboundRtpStreamStats`] with a provided number of
/// `packets_sent` and [`RtcOutboundRtpStreamMediaType`] based on
/// `is_audio`.
Expand Down Expand Up @@ -1009,7 +1033,7 @@ mod tests {
/// Generates [`RtcStats`] and adds them to inner
/// [`PeersMetricsService`] for `PeerId(1)`.
pub fn add_stats(
&mut self,
&self,
send_audio: u32,
send_video: u32,
recv_audio: u32,
Expand Down Expand Up @@ -1112,7 +1136,7 @@ mod tests {
}

/// Calls [`PeerMetricsService::check_peers`].
pub fn check_peers(&mut self) {
pub fn check_peers(&self) {
self.metrics.check_peers();
}

Expand Down
Loading