Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't timeout a server connection while in a transaction. #399

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions src/net/server/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Support for stream based connections.
use core::ops::{ControlFlow, Deref};
use core::sync::atomic::{AtomicBool, Ordering};
use core::time::Duration;

use std::fmt::Display;
Expand Down Expand Up @@ -264,6 +265,9 @@ where
/// DNS protocol idle time out tracking.
idle_timer: IdleTimer,

/// Is a transaction in progress?
in_transaction: Arc<AtomicBool>,

/// [`ServerMetrics`] describing the status of the server.
metrics: Arc<ServerMetrics>,
}
Expand Down Expand Up @@ -312,6 +316,7 @@ where
mpsc::channel(config.max_queued_responses);
let config = Arc::new(ArcSwap::from_pointee(config));
let idle_timer = IdleTimer::new();
let in_transaction = Arc::new(AtomicBool::new(false));

// Place the ReadHalf of the stream into an Option so that we can take
// it out (as we can't clone it and we can't place it into an Arc
Expand All @@ -333,6 +338,7 @@ where
result_q_tx,
service,
idle_timer,
in_transaction,
metrics,
}
}
Expand Down Expand Up @@ -423,7 +429,7 @@ where
}

_ = sleep_until(self.idle_timer.idle_timeout_deadline(self.config.load().idle_timeout)) => {
self.process_dns_idle_timeout()
self.process_dns_idle_timeout(self.config.load().idle_timeout)
}

res = &mut msg_recv => {
Expand Down Expand Up @@ -630,16 +636,19 @@ where
}
}

/// Implemnt DNS rules regarding timing out of idle connections.
/// Implement DNS rules regarding timing out of idle connections.
///
/// Disconnects the current connection of the timer is expired, flushing
/// pending responses first.
fn process_dns_idle_timeout(&self) -> Result<(), ConnectionEvent> {
fn process_dns_idle_timeout(
&self,
timeout: Duration,
) -> Result<(), ConnectionEvent> {
// DNS idle timeout elapsed, or was it reset?
if self
.idle_timer
.idle_timeout_expired(self.config.load().idle_timeout)
if self.idle_timer.idle_timeout_expired(timeout)
&& !self.in_transaction.load(Ordering::SeqCst)
{
trace!("Timing out idle connection");
Err(ConnectionEvent::DisconnectWithoutFlush)
} else {
Ok(())
Expand All @@ -654,6 +663,8 @@ where
where
Svc::Stream: Send,
{
let in_transaction = self.in_transaction.clone();

match res {
Ok(buf) => {
let received_at = Instant::now();
Expand Down Expand Up @@ -706,7 +717,6 @@ where
"Calling service for request id {request_id}"
);
let mut stream = svc.call(request).await;
let mut in_transaction = false;

trace!("Awaiting service call results for request id {request_id}");
while let Some(Ok(call_result)) =
Expand Down Expand Up @@ -738,11 +748,17 @@ where
}

ServiceFeedback::BeginTransaction => {
in_transaction = true;
in_transaction.store(
true,
Ordering::SeqCst,
);
}

ServiceFeedback::EndTransaction => {
in_transaction = false;
in_transaction.store(
false,
Ordering::SeqCst,
);
}
}
}
Expand Down Expand Up @@ -772,7 +788,9 @@ where
Err(TrySendError::Full(
unused_response,
)) => {
if in_transaction {
if in_transaction
.load(Ordering::SeqCst)
{
// Wait until there is space in the message queue.
tokio::task::yield_now()
.await;
Expand Down