Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Gate::close to support repeated calls #662

Merged
merged 2 commits into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 160 additions & 18 deletions glommio/src/sync/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use futures_lite::Future;

use crate::{
channels::local_channel::{self, LocalSender},
channels::local_channel::{self, LocalReceiver, LocalSender},
GlommioError, ResourceType, Task, TaskQueueHandle,
};

Expand Down Expand Up @@ -83,17 +83,35 @@ impl Gate {
)
}

/// Close the gate, and wait for all spawned tasks to complete
pub async fn close(&self) -> Result<(), GlommioError<()>> {
self.inner.close().await
/// Close the gate, and return a waiter for all spawned tasks to complete. If the gate is currently closing, the
/// returned future will wait for it to close before returning a success. This is particularly useful if you might
/// have a timeout on the close - the would otherwise be no safe way to retry & wait for remaining tasks to finish.
///
/// NOTE: After this function returns, [is_open](Self::is_open) returns false and any subsequent attempts to acquire
/// a pass will fail, even if you drop the future. The future will return an error if and only if the gate is
/// already fully closed
pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Future<Output = Result> or Result<Output = Future>? The latter is a cleaner API but the approach taken here means the users of the library don't need to make any code changes when upgrading. On the other hand, they may be silently impacted by a slight behavioral change in the mechanics of the close which may or may not be important.

I went with the "no code change required to upgrade" approach since I figure the behavioral change is unlikely to be noticed but let me know which approach is preferrable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you did is better.

I don't think the latter is a cleaner API, anyway, but since you're not pushing for it, what I think is immaterial =)

self.inner.close()
}

/// Whether the gate is open or not
/// Whether the gate is open or not.
pub fn is_open(&self) -> bool {
self.inner.is_open()
}

/// This returns true only if [Self::close] has been called and all spawned tasks are complete. If it returns false,
/// you may call [Self::close] without it returning an error and it'll wait for all spawned tasks to complete.
///
/// NOTE: multiple concurrent calls to [Self::close] may be a performance issue since each invocation to close will
/// allocate some nominal amount of memory for the channel underneath.
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}

type PreviousWaiter = Option<LocalSender<bool>>;
type CurrentClosure = LocalReceiver<bool>;

#[derive(Debug)]
struct GateInner {
count: Cell<usize>,
Expand Down Expand Up @@ -124,25 +142,58 @@ impl GateInner {
}
}

pub async fn close(&self) -> Result<(), GlommioError<()>> {
if self.is_open() {
if self.count.get() == 0 {
*self.state.borrow_mut() = State::Closed;
} else {
async fn wait_for_closure(
waiter: Result<Option<(CurrentClosure, PreviousWaiter)>, GlommioError<()>>,
) -> Result<(), GlommioError<()>> {
if let Some((waiter, previous_closer)) = waiter? {
waiter.recv().await;
if let Some(previous_closer) = previous_closer {
// Previous channel may be dropped so ignore the result.
let _ = previous_closer.try_send(true);
}
}

Ok(())
}

pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
match self.state.replace(State::Closed) {
State::Open => {
if self.count.get() != 0 {
let (sender, receiver) = local_channel::new_bounded(1);
self.state.replace(State::Closing(sender));
Self::wait_for_closure(Ok(Some((receiver, None))))
} else {
Self::wait_for_closure(Ok(None))
}
}
State::Closing(previous_closer) => {
assert!(
self.count.get() != 0,
"If count is 0 then the state should have been marked as closed"
);
assert!(
!previous_closer.is_full(),
"Already notified that the gate is closed!"
);

let (sender, receiver) = local_channel::new_bounded(1);
*self.state.borrow_mut() = State::Closing(sender);
receiver.recv().await;
self.state.replace(State::Closing(sender));

Self::wait_for_closure(Ok(Some((receiver, Some(previous_closer)))))
}
Ok(())
} else {
Err(GlommioError::Closed(ResourceType::Gate))
State::Closed => Self::wait_for_closure(Err(GlommioError::Closed(ResourceType::Gate))),
}
}

pub fn is_open(&self) -> bool {
matches!(*self.state.borrow(), State::Open)
}

pub fn is_closed(&self) -> bool {
matches!(*self.state.borrow(), State::Closed)
}

pub fn notify_closed(&self) {
if let State::Closing(sender) = self.state.replace(State::Closed) {
sender.try_send(true).unwrap();
Expand All @@ -154,10 +205,11 @@ impl GateInner {

#[cfg(test)]
mod tests {
use crate::{enclose, LocalExecutor};

use super::*;
use crate::sync::Semaphore;
use crate::{enclose, timer::timeout, LocalExecutor};
use futures::join;
use std::time::Duration;

#[test]
fn test_immediate_close() {
Expand All @@ -168,7 +220,9 @@ mod tests {
gate.close().await.unwrap();
assert!(!gate.is_open());

assert!(gate.spawn(async {}).is_err())
assert!(gate.spawn(async {}).is_err());

assert!(gate.close().await.is_err());
})
}

Expand Down Expand Up @@ -229,4 +283,92 @@ mod tests {
assert!(!running.get());
})
}

#[test]
fn test_concurrent_close() {
LocalExecutor::default().run(async {
let gate = &Gate::new();
let gate_closures = &Semaphore::new(0);
let closed = &RefCell::new(false);

let pass = gate.enter().unwrap();

join!(
async {
gate_closures.signal(1);
gate.close().await.unwrap();
assert!(*closed.borrow());
},
async {
gate_closures.signal(1);
gate.close().await.unwrap();
assert!(*closed.borrow());
},
async {
gate_closures.acquire(2).await.unwrap();
drop(pass);
closed.replace(true);
},
);
})
}

#[test]
fn test_close_after_timed_out_close() {
LocalExecutor::default().run(async {
let gate = Gate::new();
let gate = &gate;
let gate_closed_once = Rc::new(Semaphore::new(0));
let task_gate = gate_closed_once.clone();

let _task = gate
.spawn(async move {
task_gate.acquire(1).await.unwrap();
})
.unwrap();

timeout(Duration::from_millis(1), async move {
gate.close().await.unwrap();
Ok(())
})
.await
.expect_err("Should have timed out");

assert!(
!gate.is_closed(),
"Should still be waiting for a task that hasn't finished"
);

gate_closed_once.signal(1);

gate.close().await.unwrap();
})
}

#[test]
fn test_marked_closed_without_waiting() {
LocalExecutor::default().run(async {
let gate = Gate::new();
// Even if task is immediately cancelled, the gate still closes.
drop(gate.close());
assert!(gate.is_closed());

let gate = Gate::new();
let pass = gate.enter().unwrap();
// Even if task is cancelled, the gate is still marked as closing.
drop(gate.close());
assert!(!gate.is_open());
assert!(!gate.is_closed());
// Here we install a waiter after the aborted cancel.
let wait_for_closure = gate.close();
join!(
async move {
drop(pass);
},
async move {
wait_for_closure.await.unwrap();
}
);
})
}
}
Loading