Skip to content

Commit

Permalink
attempt to fix webserver lockup (#2788)
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez authored Nov 14, 2024
1 parent db6fc66 commit 46179f5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/startos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ tokio-util = { version = "0.7.9", features = ["io"] }
torut = { git = "https://github.com/Start9Labs/torut.git", branch = "update/dependencies", features = [
"serialize",
] }
tower-service = "0.3.2"
tower-service = "0.3.3"
tracing = "0.1.39"
tracing-error = "0.2.0"
tracing-futures = "0.2.5"
Expand Down
50 changes: 41 additions & 9 deletions core/startos/src/net/web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::extract::Request;
use axum::Router;
use axum_server::Handle;
use bytes::Bytes;
use futures::future::ready;
use futures::future::{ready, BoxFuture};
use futures::FutureExt;
use helpers::NonDetachingJoinHandle;
use tokio::sync::{oneshot, watch};
Expand All @@ -30,8 +30,39 @@ impl SwappableRouter {
}
}

#[derive(Clone)]
pub struct SwappableRouterService(watch::Receiver<Router>);
pub struct SwappableRouterService {
router: watch::Receiver<Router>,
changed: Option<BoxFuture<'static, ()>>,
}
impl SwappableRouterService {
fn router(&self) -> Router {
self.router.borrow().clone()
}
fn changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut changed = if let Some(changed) = self.changed.take() {
changed
} else {
let mut router = self.router.clone();
async move {
router.changed().await;
}
.boxed()
};
if changed.poll_unpin(cx).is_ready() {
return Poll::Ready(());
}
self.changed = Some(changed);
Poll::Pending
}
}
impl Clone for SwappableRouterService {
fn clone(&self) -> Self {
Self {
router: self.router.clone(),
changed: None,
}
}
}
impl<B> tower_service::Service<Request<B>> for SwappableRouterService
where
B: axum::body::HttpBody<Data = Bytes> + Send + 'static,
Expand All @@ -42,15 +73,13 @@ where
type Future = <Router as tower_service::Service<Request<B>>>::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut changed = self.0.changed().boxed();
if changed.poll_unpin(cx).is_ready() {
if self.changed(cx).is_ready() {
return Poll::Ready(Ok(()));
}
drop(changed);
tower_service::Service::<Request<B>>::poll_ready(&mut self.0.borrow().clone(), cx)
tower_service::Service::<Request<B>>::poll_ready(&mut self.router(), cx)
}
fn call(&mut self, req: Request<B>) -> Self::Future {
self.0.borrow().clone().call(req)
self.router().call(req)
}
}

Expand All @@ -66,7 +95,10 @@ impl<T> tower_service::Service<T> for SwappableRouter {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: T) -> Self::Future {
ready(Ok(SwappableRouterService(self.0.subscribe())))
ready(Ok(SwappableRouterService {
router: self.0.subscribe(),
changed: None,
}))
}
}

Expand Down

0 comments on commit 46179f5

Please sign in to comment.