Skip to content

Commit

Permalink
fix: ensure pending packets are not lost on EventLoop::clean (#780)
Browse files Browse the repository at this point in the history
BREAKING: `EventLoop.pending` uses a `VecDeque` instead of `IntoIter`

* fix: use `Vec::extend` to not lost pending

* refactor: remove unnecessary unwrap

* style: clippy suggestions

* doc: note unchecked growth of pending

* fix: `VecDeque` preserves ordering

Also reverts 8406c4c

* doc: note for `EventLoop::clean`

* fix: same bug in `mod v5`

* doc: add changelog entry
  • Loading branch information
Devdutt Shenoi authored Jan 4, 2024
1 parent 65a18f6 commit c63de08
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 31 deletions.
3 changes: 2 additions & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed the `Key` enum: users do not need to specify the TLS key variant in the `TlsConfiguration` anymore, this is inferred automatically.
To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initialization.
- certificate for client authentication is now optional while using native-tls. `der` & `password` fields are replaced by `client_auth`.
- Make v5 `RetainForwardRule` public, in order to allow setting it when constructing `Filter` values.
- Make v5 `RetainForwardRule` public, in order to allow setting it when constructing `Filter` values.
- Use `VecDeque` instead of `IntoIter` to fix unintentional drop of pending requests on `EventLoop::clean` (#780)

### Deprecated

Expand Down
30 changes: 14 additions & 16 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use tokio::net::{lookup_host, TcpSocket, TcpStream};
use tokio::select;
use tokio::time::{self, Instant, Sleep};

use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::time::Duration;
use std::vec::IntoIter;

#[cfg(unix)]
use {std::path::Path, tokio::net::UnixStream};
Expand Down Expand Up @@ -79,7 +79,7 @@ pub struct EventLoop {
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
/// Pending packets from last session
pub pending: IntoIter<Request>,
pub pending: VecDeque<Request>,
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
Expand All @@ -101,8 +101,7 @@ impl EventLoop {
/// access and update `options`, `state` and `requests`.
pub fn new(mqtt_options: MqttOptions, cap: usize) -> EventLoop {
let (requests_tx, requests_rx) = bounded(cap);
let pending = Vec::new();
let pending = pending.into_iter();
let pending = VecDeque::new();
let max_inflight = mqtt_options.inflight;
let manual_acks = mqtt_options.manual_acks;
let max_outgoing_packet_size = mqtt_options.max_outgoing_packet_size;
Expand All @@ -123,18 +122,17 @@ impl EventLoop {
/// republished in the next session. Move pending messages from state to eventloop, drops the
/// underlying network connection and clears the keepalive timeout if any.
///
/// NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect
/// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect.
/// > Also, while this helps prevent data loss, the pending list length should be managed properly.
/// > For this reason we recommend setting [`AsycClient`](crate::AsyncClient)'s channel capacity to `0`.
pub fn clean(&mut self) {
self.network = None;
self.keepalive_timeout = None;
let mut pending = self.state.clean();
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
// this helps in preventing data loss
let requests_in_channel = self.requests_rx.drain();
pending.extend(requests_in_channel);

self.pending = pending.into_iter();
self.pending.extend(requests_in_channel);
}

/// Yields Next notification or outgoing request and periodically pings
Expand Down Expand Up @@ -229,7 +227,7 @@ impl EventLoop {
&mut self.pending,
&self.requests_rx,
self.mqtt_options.pending_throttle
), if self.pending.len() > 0 || (!inflight_full && !collision) => match o {
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
Expand Down Expand Up @@ -267,15 +265,15 @@ impl EventLoop {
}

async fn next_request(
pending: &mut IntoIter<Request>,
pending: &mut VecDeque<Request>,
rx: &Receiver<Request>,
pending_throttle: Duration,
) -> Result<Request, ConnectionError> {
if pending.len() > 0 {
if !pending.is_empty() {
time::sleep(pending_throttle).await;
// We must call .next() AFTER sleep() otherwise .next() would
// advance the iterator but the future might be canceled before return
Ok(pending.next().unwrap())
// We must call .pop_front() AFTER sleep() otherwise we would have
// advanced the iterator but the future might be canceled before return
Ok(pending.pop_front().unwrap())
} else {
match rx.recv_async().await {
Ok(r) => Ok(r),
Expand Down
26 changes: 12 additions & 14 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use flume::{bounded, Receiver, Sender};
use tokio::select;
use tokio::time::{self, error::Elapsed, Instant, Sleep};

use std::collections::VecDeque;
use std::convert::TryInto;
use std::io;
use std::pin::Pin;
use std::time::Duration;
use std::vec::IntoIter;

use super::mqttbytes::v5::ConnectReturnCode;

Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct EventLoop {
/// Requests handle to send requests
pub(crate) requests_tx: Sender<Request>,
/// Pending packets from last session
pub pending: IntoIter<Request>,
pub pending: VecDeque<Request>,
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
Expand All @@ -99,8 +99,7 @@ impl EventLoop {
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, cap: usize) -> EventLoop {
let (requests_tx, requests_rx) = bounded(cap);
let pending = Vec::new();
let pending = pending.into_iter();
let pending = VecDeque::new();
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
let manual_acks = options.manual_acks;

Expand All @@ -119,18 +118,17 @@ impl EventLoop {
/// republished in the next session. Move pending messages from state to eventloop, drops the
/// underlying network connection and clears the keepalive timeout if any.
///
/// NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect
/// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect.
/// > Also, while this helps prevent data loss, the pending list length should be managed properly.
/// > For this reason we recommend setting [`AsycClient`](super::AsyncClient)'s channel capacity to `0`.
pub fn clean(&mut self) {
self.network = None;
self.keepalive_timeout = None;
let mut pending = self.state.clean();
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
// this helps in preventing data loss
let requests_in_channel = self.requests_rx.drain();
pending.extend(requests_in_channel);

self.pending = pending.into_iter();
self.pending.extend(requests_in_channel);
}

/// Yields Next notification or outgoing request and periodically pings
Expand Down Expand Up @@ -210,7 +208,7 @@ impl EventLoop {
&mut self.pending,
&self.requests_rx,
self.options.pending_throttle
), if self.pending.len() > 0 || (!inflight_full && !collision) => match o {
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
network.flush(&mut self.state.write).await?;
Expand Down Expand Up @@ -239,15 +237,15 @@ impl EventLoop {
}

async fn next_request(
pending: &mut IntoIter<Request>,
pending: &mut VecDeque<Request>,
rx: &Receiver<Request>,
pending_throttle: Duration,
) -> Result<Request, ConnectionError> {
if pending.len() > 0 {
if !pending.is_empty() {
time::sleep(pending_throttle).await;
// We must call .next() AFTER sleep() otherwise .next() would
// advance the iterator but the future might be canceled before return
Ok(pending.next().unwrap())
Ok(pending.pop_front().unwrap())
} else {
match rx.recv_async().await {
Ok(r) => Ok(r),
Expand Down

0 comments on commit c63de08

Please sign in to comment.