From a78a7fdf9b24db836f25eb27bf55058c8e8b48dd Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Mon, 22 Jan 2024 15:11:30 +0100 Subject: [PATCH] Cleanup `process_wait_queue` --- CHANGELOG.md | 1 + src/io.rs | 42 ++++++++++++------------------------------ 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d71ccb5..92e5647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed * The IO layer will no longer drop frames if no multi-frame payloads are sent while a non-multi-frame payload has been moved to the wait queue due to exceeding the in-flight request limit. +* The outgoing request queue will now process much faster when filled with large amounts of requests. ## [0.2.0] - 2023-11-24 diff --git a/src/io.rs b/src/io.rs index 72badb1..6461924 100644 --- a/src/io.rs +++ b/src/io.rs @@ -792,9 +792,6 @@ where // Once the scheduled frame is processed, we will finished the multi-frame // transfer, so we can allow for the next multi-frame transfer to be scheduled. self.active_multi_frame[about_to_finish.channel().get() as usize] = None; - - // Ensure the next multi-frame message gets scheduled, if available. - self.process_wait_queue(about_to_finish.channel())?; } } } @@ -805,43 +802,28 @@ where /// Process the wait queue of a given channel, promoting messages that are ready to be sent. fn process_wait_queue(&mut self, channel: ChannelId) -> Result<(), LocalProtocolViolation> { - let chan = &mut self.wait_queue[channel.get() as usize]; - let mut max_items_to_check = chan.len(); - #[cfg(feature = "tracing")] - tracing::trace!(%channel, "processing dirty channel"); - - // The code below is not as bad it looks complexity wise, anticipating two common cases: - // - // 1. A multi-frame read has finished, with capacity for requests to spare. Only - // multi-frame requests will be waiting in the wait queue, so we will likely pop the - // first item, only scanning the rest once. - // 2. One or more requests finished, so we also have a high chance of picking the first - // few requests out of the queue. - let mut ready = Vec::new(); - - while let Some(item) = chan.pop_front() { + let mut remaining = self.wait_queue[channel.get() as usize].len(); + + while let Some(item) = self.wait_queue[channel.get() as usize].pop_front() { if item_should_wait(&item, &self.juliet, &self.active_multi_frame)?.is_some() { - #[cfg(feature = "tracing")] - tracing::trace!(%item, "still waiting"); // Put it right back into the queue. - chan.push_back(item); + self.wait_queue[channel.get() as usize].push_back(item); } else { - #[cfg(feature = "tracing")] - tracing::debug!(%item, "became ready"); - ready.push(item); + self.send_to_ready_queue(item)?; + + // No need to look further if we have saturated the channel. + if !self.juliet.allowed_to_send_request(channel)? { + break; + } } // Ensure we do not loop endlessly if we cannot find anything. - max_items_to_check -= 1; - if max_items_to_check == 0 { + remaining -= 1; + if remaining == 0 { break; } } - for item in ready { - self.send_to_ready_queue(item)?; - } - Ok(()) } }