From 4568083ea4632dbbcb8403ac7feafcab3140559b Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Fri, 10 May 2024 20:43:26 -0700 Subject: [PATCH 1/2] Improve Gate::close to support repeated calls If Gate::close has a timeout, closing the gate is left in a dangling state with no way to safely resume waiting. This change makes it safe in that calling close on a closing Gate still waits for any outstanding tasks. This way you can repeatedly try calling gate close with a timeout set while safely waiting for all tasks to complete. This is technically an observable change in that a previous call to close when still in "closing" would have returned an error but now waits for the currently running tasks instead. I can't imagine anyone actually relies too much on this. --- glommio/src/sync/gate.rs | 121 ++++++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 13 deletions(-) diff --git a/glommio/src/sync/gate.rs b/glommio/src/sync/gate.rs index 926097b19..db8876f36 100644 --- a/glommio/src/sync/gate.rs +++ b/glommio/src/sync/gate.rs @@ -83,15 +83,26 @@ impl Gate { ) } - /// Close the gate, and wait for all spawned tasks to complete + /// Close the gate, and wait for all spawned tasks to complete. If the gate is currently closing, this will wait + /// for it to close before returning a success. This is particularly useful if you might have a timeout on the close + /// - the would otherwise be no safe way to retry & wait for remaining tasks to finish. pub async fn close(&self) -> Result<(), GlommioError<()>> { self.inner.close().await } - /// Whether the gate is open or not + /// Whether the gate is open or not. pub fn is_open(&self) -> bool { self.inner.is_open() } + + /// This returns true only if [Self::close] has been called and all spawned tasks are complete. If it returns false, + /// you may call [Self::close] without it returning an error and it'll wait for all spawned tasks to complete. + /// + /// NOTE: multiple concurrent calls to [Self::close] may be a performance issue since each invocation to close will + /// allocate some nominal amount of memory for the channel underneath. + pub fn is_closed(&self) -> bool { + self.inner.is_closed() + } } #[derive(Debug)] @@ -125,17 +136,33 @@ impl GateInner { } pub async fn close(&self) -> Result<(), GlommioError<()>> { - if self.is_open() { - if self.count.get() == 0 { - *self.state.borrow_mut() = State::Closed; - } else { + match self.state.replace(State::Closed) { + State::Open => { + if self.count.get() != 0 { + let (sender, receiver) = local_channel::new_bounded(1); + self.state.replace(State::Closing(sender)); + receiver.recv().await; + } + Ok(()) + } + State::Closing(previous_closer) => { + assert!( + self.count.get() != 0, + "If count is 0 then the state should have been marked as closed" + ); + assert!( + !previous_closer.is_full(), + "Already notified that the gate is closed!" + ); + let (sender, receiver) = local_channel::new_bounded(1); - *self.state.borrow_mut() = State::Closing(sender); + self.state.replace(State::Closing(sender)); + receiver.recv().await; + let _ = previous_closer.try_send(true); + Ok(()) } - Ok(()) - } else { - Err(GlommioError::Closed(ResourceType::Gate)) + State::Closed => Err(GlommioError::Closed(ResourceType::Gate)), } } @@ -143,6 +170,10 @@ impl GateInner { matches!(*self.state.borrow(), State::Open) } + pub fn is_closed(&self) -> bool { + matches!(*self.state.borrow(), State::Closed) + } + pub fn notify_closed(&self) { if let State::Closing(sender) = self.state.replace(State::Closed) { sender.try_send(true).unwrap(); @@ -154,10 +185,11 @@ impl GateInner { #[cfg(test)] mod tests { - use crate::{enclose, LocalExecutor}; - use super::*; use crate::sync::Semaphore; + use crate::{enclose, timer::timeout, LocalExecutor}; + use futures::join; + use std::time::Duration; #[test] fn test_immediate_close() { @@ -168,7 +200,9 @@ mod tests { gate.close().await.unwrap(); assert!(!gate.is_open()); - assert!(gate.spawn(async {}).is_err()) + assert!(gate.spawn(async {}).is_err()); + + assert!(gate.close().await.is_err()); }) } @@ -229,4 +263,65 @@ mod tests { assert!(!running.get()); }) } + + #[test] + fn test_concurrent_close() { + LocalExecutor::default().run(async { + let gate = &Gate::new(); + let gate_closures = &Semaphore::new(0); + let closed = &RefCell::new(false); + + let pass = gate.enter().unwrap(); + + join!( + async { + gate_closures.signal(1); + gate.close().await.unwrap(); + assert!(*closed.borrow()); + }, + async { + gate_closures.signal(1); + gate.close().await.unwrap(); + assert!(*closed.borrow()); + }, + async { + gate_closures.acquire(2).await.unwrap(); + drop(pass); + closed.replace(true); + }, + ); + }) + } + + #[test] + fn test_close_after_timed_out_close() { + LocalExecutor::default().run(async { + let gate = Gate::new(); + let gate = &gate; + let gate_closed_once = Rc::new(Semaphore::new(0)); + let task_gate = gate_closed_once.clone(); + + let _task = gate + .spawn(async move { + task_gate.acquire(1).await.unwrap(); + }) + .unwrap(); + + timeout(Duration::from_millis(1), async move { + gate.close().await.unwrap(); + Ok(()) + }) + .await + .expect_err("Should have timed out"); + + assert!( + !gate.is_closed(), + "Should still be waiting for a task that hasn't finished" + ); + + gate_closed_once.signal(1); + + gate.close().await.unwrap(); + }) + } } From 1ca99f4b41f79a41da31447b8afc31b60fa29ad3 Mon Sep 17 00:00:00 2001 From: Vitali Lovich Date: Sat, 11 May 2024 09:23:11 -0700 Subject: [PATCH 2/2] Make the Gate::close synchronous returning future It's useful if the pattern you want to do is to initiate a close and then wait for it completing later. Otherwise you sometimes have to jump through hoops. --- glommio/src/sync/gate.rs | 73 +++++++++++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/glommio/src/sync/gate.rs b/glommio/src/sync/gate.rs index db8876f36..704ee7a3b 100644 --- a/glommio/src/sync/gate.rs +++ b/glommio/src/sync/gate.rs @@ -6,7 +6,7 @@ use std::{ use futures_lite::Future; use crate::{ - channels::local_channel::{self, LocalSender}, + channels::local_channel::{self, LocalReceiver, LocalSender}, GlommioError, ResourceType, Task, TaskQueueHandle, }; @@ -83,11 +83,15 @@ impl Gate { ) } - /// Close the gate, and wait for all spawned tasks to complete. If the gate is currently closing, this will wait - /// for it to close before returning a success. This is particularly useful if you might have a timeout on the close - /// - the would otherwise be no safe way to retry & wait for remaining tasks to finish. - pub async fn close(&self) -> Result<(), GlommioError<()>> { - self.inner.close().await + /// Close the gate, and return a waiter for all spawned tasks to complete. If the gate is currently closing, the + /// returned future will wait for it to close before returning a success. This is particularly useful if you might + /// have a timeout on the close - the would otherwise be no safe way to retry & wait for remaining tasks to finish. + /// + /// NOTE: After this function returns, [is_open](Self::is_open) returns false and any subsequent attempts to acquire + /// a pass will fail, even if you drop the future. The future will return an error if and only if the gate is + /// already fully closed + pub fn close(&self) -> impl Future>> { + self.inner.close() } /// Whether the gate is open or not. @@ -105,6 +109,9 @@ impl Gate { } } +type PreviousWaiter = Option>; +type CurrentClosure = LocalReceiver; + #[derive(Debug)] struct GateInner { count: Cell, @@ -135,15 +142,30 @@ impl GateInner { } } - pub async fn close(&self) -> Result<(), GlommioError<()>> { + async fn wait_for_closure( + waiter: Result, GlommioError<()>>, + ) -> Result<(), GlommioError<()>> { + if let Some((waiter, previous_closer)) = waiter? { + waiter.recv().await; + if let Some(previous_closer) = previous_closer { + // Previous channel may be dropped so ignore the result. + let _ = previous_closer.try_send(true); + } + } + + Ok(()) + } + + pub fn close(&self) -> impl Future>> { match self.state.replace(State::Closed) { State::Open => { if self.count.get() != 0 { let (sender, receiver) = local_channel::new_bounded(1); self.state.replace(State::Closing(sender)); - receiver.recv().await; + Self::wait_for_closure(Ok(Some((receiver, None)))) + } else { + Self::wait_for_closure(Ok(None)) } - Ok(()) } State::Closing(previous_closer) => { assert!( @@ -158,11 +180,9 @@ impl GateInner { let (sender, receiver) = local_channel::new_bounded(1); self.state.replace(State::Closing(sender)); - receiver.recv().await; - let _ = previous_closer.try_send(true); - Ok(()) + Self::wait_for_closure(Ok(Some((receiver, Some(previous_closer))))) } - State::Closed => Err(GlommioError::Closed(ResourceType::Gate)), + State::Closed => Self::wait_for_closure(Err(GlommioError::Closed(ResourceType::Gate))), } } @@ -324,4 +344,31 @@ mod tests { gate.close().await.unwrap(); }) } + + #[test] + fn test_marked_closed_without_waiting() { + LocalExecutor::default().run(async { + let gate = Gate::new(); + // Even if task is immediately cancelled, the gate still closes. + drop(gate.close()); + assert!(gate.is_closed()); + + let gate = Gate::new(); + let pass = gate.enter().unwrap(); + // Even if task is cancelled, the gate is still marked as closing. + drop(gate.close()); + assert!(!gate.is_open()); + assert!(!gate.is_closed()); + // Here we install a waiter after the aborted cancel. + let wait_for_closure = gate.close(); + join!( + async move { + drop(pass); + }, + async move { + wait_for_closure.await.unwrap(); + } + ); + }) + } }