Skip to content

Commit

Permalink
feat(rebaser): change quiescent shutdown to reduce missed activity
Browse files Browse the repository at this point in the history
This change alters the logic that helps a change set "process" task to
shut down when no Rebaser requests have been seen over our
`quiescent_period`. Prior to this change there was a shutdown window
period where the `ChangeSetProcessorTask` would not be looking for new
Rebaser requests to process while waiting for the `SerialDvuTask` to
end. As a hedge against this scenario the process task handler checks
the change set subject just before ending to ensure that if there's at
least one request message that we don't ack/delete the task.

In this altered version of a quiescent shutdown we notice the quiet
period as before in the Rebaser requests subscription stream. However,
now a `quiesced_notify` `tokio::sync::Notify` is fired to signal the
`SerialDvuTask`. Then the `ChangeSetProcessorTask` continues to process
any further requests that may show up (remember that after running a
"dvu" job, another Rebaser request is often submitted). Meanwhile in
the `SerialDvuTask`, it will continue to run "dvu" jobs as long as the
`run_dvu_notify` has been set (in effect "draining" any pending runs),
and only then will check to see if the `quiesced_notify` has been set.
If it has, then it will cancel the `quiesced_token` which cause
`SerialDvuTask` to return with an `Ok(Shutdown::Quiesced)` and that same
`CancellationToken` will cause the Naxum app in `ChangeSetProcessorTask`
to be gracefully shut down.

With these changes, the one or two remaining "dvu" jobs will not cause
the process task to stop processing further Rebaser requests. For
example, let's assuming that the last 2 "dvu" jobs take 8 minutes each.
That means that the process task is in a quiescent shutdown for up to
the next 8 * 2 = 16 minutes, during which time any further Rebaser
requests will also be processed (whereas they may not have been prior to
this change).

Signed-off-by: Fletcher Nichol <[email protected]>

Uncommited changes
  • Loading branch information
britmyerss committed Jan 17, 2025
1 parent d75899f commit ad53551
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/rebaser-server/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rust_library(
"//lib/si-settings:si-settings",
"//lib/si-std:si-std",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/telemetry-utils-rs:telemetry-utils",
"//lib/telemetry-rs:telemetry",
"//lib/veritech-client:veritech-client",
"//third-party/rust:derive_builder",
Expand Down
1 change: 1 addition & 0 deletions lib/rebaser-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ si-settings = { path = "../../lib/si-settings" }
si-std = { path = "../../lib/si-std" }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }
telemetry-utils = { path = "../../lib/telemetry-utils-rs" }
veritech-client = { path = "../../lib/veritech-client" }

derive_builder = { workspace = true }
Expand Down
80 changes: 31 additions & 49 deletions lib/rebaser-server/src/change_set_processor_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use si_data_nats::{
};
use si_events::{ChangeSetId, WorkspacePk};
use telemetry::prelude::*;
use telemetry_utils::metric;
use thiserror::Error;
use tokio::sync::Notify;
use tokio_stream::StreamExt as _;
Expand All @@ -49,7 +50,6 @@ pub(crate) struct ChangeSetProcessorTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>,
quiescence_token: CancellationToken,
}

impl ChangeSetProcessorTask {
Expand All @@ -64,8 +64,10 @@ impl ChangeSetProcessorTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
ctx_builder: DalContextBuilder,
run_notify: Arc<Notify>,
run_dvu_notify: Arc<Notify>,
quiescent_period: Duration,
quiesced_notify: Arc<Notify>,
quiesced_token: CancellationToken,
task_token: CancellationToken,
server_tracker: TaskTracker,
) -> Self {
Expand All @@ -78,44 +80,41 @@ impl ChangeSetProcessorTask {
change_set_id,
nats,
ctx_builder,
run_notify,
run_dvu_notify,
server_tracker,
);

let quiescence_token = CancellationToken::new();

let captured = QuiescedCaptured {
instance_id: metadata.instance_id().to_string(),
workspace_id,
change_set_id,
quiescence_token: quiescence_token.clone(),
quiesced_notify: quiesced_notify.clone(),
};

let inactive_aware_incoming = incoming
// Looks for a gap between incoming messages greater than the duration
.timeout(quiescent_period)
// Fire the quiescence token which triggers a distinctive shutdown where we *know* we
// want to remove the task from the set of work.
// Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where
// we *know* we want to remove the task from the set of work.
.inspect_err(move |_elapsed| {
let QuiescedCaptured {
instance_id,
workspace_id,
change_set_id,
quiescence_token,
quiesced_notify,
} = &captured;

debug!(
info!(
service.instance.id = instance_id,
si.workspace.id = %workspace_id,
si.change_set.id = %change_set_id,
"rate of requests has become inactive, shutting down processing tasks",
"rate of requests has become inactive, triggering a quiesced shutdown",
);
quiescence_token.cancel();
// Notify the serial dvu task that we want to shutdown due to a quiet period
quiesced_notify.notify_one();
})
// Once the first inactive period is detected, this stream is closed (i.e. returns
// `None`)
.map_while(result::Result::ok)
.fuse();
// Continue processing messages as normal until the Naxum app's graceful shutdown is
// triggered. This means we turn the stream back from a stream of
// `Result<Result<Message, _>, Elapsed>` into `Result<Message, _>`
.filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok());

let app = ServiceBuilder::new()
.layer(
Expand All @@ -135,10 +134,7 @@ impl ChangeSetProcessorTask {

let inner =
naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1)
.with_graceful_shutdown(graceful_shutdown_signal(
task_token,
quiescence_token.clone(),
));
.with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token));

let inner_fut = inner.into_future();

Expand All @@ -147,44 +143,28 @@ impl ChangeSetProcessorTask {
workspace_id,
change_set_id,
inner: Box::new(inner_fut),
quiescence_token,
}
}

pub(crate) async fn try_run(self) -> Result<Shutdown> {
pub(crate) async fn try_run(self) -> Result<()> {
self.inner.await.map_err(Error::Naxum)?;
metric!(counter.change_set_processor_task.change_set_task = -1);

if self.quiescence_token.is_cancelled() {
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown due to quiescent period",
);
Ok(Shutdown::Quiesced)
} else {
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(Shutdown::Graceful)
}
info!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(())
}
}

#[derive(Debug)]
pub(crate) enum Shutdown {
Graceful,
Quiesced,
}

struct QuiescedCaptured {
instance_id: String,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
quiescence_token: CancellationToken,
quiesced_notify: Arc<Notify>,
}

#[derive(Clone, Debug)]
Expand All @@ -207,7 +187,7 @@ impl post_process::OnSuccess for DeleteMessageOnSuccess {
let stream = self.stream.clone();

Box::pin(async move {
trace!("deleting message on success");
info!("deleting message on success");
if let Err(err) = stream.delete_message(info.stream_sequence).await {
warn!(
si.error.message = ?err,
Expand Down Expand Up @@ -305,6 +285,7 @@ mod handlers {
use si_data_nats::HeaderMap;
use telemetry::prelude::*;
use telemetry_nats::propagation;
use telemetry_utils::metric;
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -349,6 +330,7 @@ mod handlers {

impl IntoResponse for HandlerError {
fn into_response(self) -> Response {
metric!(counter.change_set_processor_task.failed_rebase = 1);
// TODO(fnichol): there are different responses, esp. for expected interrupted
error!(si.error.message = ?self, "failed to process message");
Response::default_internal_server_error()
Expand Down
23 changes: 15 additions & 8 deletions lib/rebaser-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use ulid::Ulid;

use crate::{
app_state::AppState,
change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError, Shutdown},
change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError},
serial_dvu_task::{SerialDvuTask, SerialDvuTaskError},
Shutdown,
};

const CONSUMER_NAME_PREFIX: &str = "rebaser-requests";
Expand Down Expand Up @@ -105,7 +106,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// We want to indendently control the lifecyle of our tasks
let tasks_token = CancellationToken::new();

let run_notify = Arc::new(Notify::new());
let run_dvu_notify = Arc::new(Notify::new());
let quiesced_token = CancellationToken::new();
let quiesced_notify = Arc::new(Notify::new());

let incoming = requests_stream
.create_consumer(rebaser_requests_per_change_set_consumer_config(
Expand All @@ -126,7 +129,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
workspace.id,
change_set.id,
ctx_builder.clone(),
run_notify.clone(),
run_dvu_notify.clone(),
quiesced_notify.clone(),
quiesced_token.clone(),
tasks_token.clone(),
);

Expand All @@ -138,8 +143,10 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
workspace.id,
change_set.id,
ctx_builder,
run_notify,
run_dvu_notify,
quiescent_period,
quiesced_notify,
quiesced_token,
tasks_token.clone(),
server_tracker,
);
Expand All @@ -166,11 +173,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// Processor task completed
processor_task_result_result = processor_task_result => {
match processor_task_result_result {
// A quiet period was found in the stream; reply `Ok` to ack and remove this task
Ok(Ok(Shutdown::Quiesced)) => Ok(()),
// Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to
// persist and retry
Ok(Ok(Shutdown::Graceful)) => Err(Error::ChangeSetProcessorCompleted),
Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted),
// Processor exited with error; reply `Err` to nack for task to persist and retry
Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)),
// Tokio join error on processor exit; reply `Err` to nack for task to persist and
Expand All @@ -181,9 +186,11 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// Serial dvu task completed
dvu_task_result_result = dvu_task_result => {
match dvu_task_result_result {
// A quiet period was found in the stream; reply Ok to ack and remove this task
Ok(Ok(Shutdown::Quiesced)) => Ok(()),
// Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to
// persist and retry
Ok(Ok(())) => Err(Error::SerialDvuCompleted),
Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted),
// Serial dvu exited with error; reply `Err` to nack for task to persist and retry
Ok(Err(err)) => Err(Error::SerialDvu(err)),
// Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and
Expand Down
6 changes: 6 additions & 0 deletions lib/rebaser-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,9 @@ impl ServerError {
type Error = ServerError;

type Result<T> = std::result::Result<T, ServerError>;

#[derive(Debug)]
pub(crate) enum Shutdown {
Graceful,
Quiesced,
}
Loading

0 comments on commit ad53551

Please sign in to comment.