From ad5355185046d6795775e18af20af81876792f61 Mon Sep 17 00:00:00 2001 From: Brit Myers Date: Wed, 15 Jan 2025 16:32:55 -0500 Subject: [PATCH] feat(rebaser): change quiescent shutdown to reduce missed activity This change alters the logic that helps a change set "process" task to shut down when no Rebaser requests have been seen over our `quiescent_period`. Prior to this change there was a shutdown window period where the `ChangeSetProcessorTask` would not be looking for new Rebaser requests to process while waiting for the `SerialDvuTask` to end. As a hedge against this scenario the process task handler checks the change set subject just before ending to ensure that if there's at least one request message that we don't ack/delete the task. In this altered version of a quiescent shutdown we notice the quiet period as before in the Rebaser requests subscription stream. However, now a `quiesced_notify` `tokio::sync::Notify` is fired to signal the `SerialDvuTask`. Then the `ChangeSetProcessorTask` continues to process any further requests that may show up (remember that after running a "dvu" job, another Rebaser request is often submitted). Meanwhile in the `SerialDvuTask`, it will continue to run "dvu" jobs as long as the `run_dvu_notify` has been set (in effect "draining" any pending runs), and only then will check to see if the `quiesced_notify` has been set. If it has, then it will cancel the `quiesced_token` which cause `SerialDvuTask` to return with an `Ok(Shutdown::Quiesced)` and that same `CancellationToken` will cause the Naxum app in `ChangeSetProcessorTask` to be gracefully shut down. With these changes, the one or two remaining "dvu" jobs will not cause the process task to stop processing further Rebaser requests. For example, let's assuming that the last 2 "dvu" jobs take 8 minutes each. That means that the process task is in a quiescent shutdown for up to the next 8 * 2 = 16 minutes, during which time any further Rebaser requests will also be processed (whereas they may not have been prior to this change). Signed-off-by: Fletcher Nichol Uncommited changes --- Cargo.lock | 1 + lib/rebaser-server/BUCK | 1 + lib/rebaser-server/Cargo.toml | 1 + .../src/change_set_processor_task.rs | 80 +++++++---------- lib/rebaser-server/src/handlers.rs | 23 +++-- lib/rebaser-server/src/lib.rs | 6 ++ lib/rebaser-server/src/serial_dvu_task.rs | 87 ++++++++++++++++--- 7 files changed, 132 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db480a514d..8309cdca80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5502,6 +5502,7 @@ dependencies = [ "si-std", "telemetry", "telemetry-nats", + "telemetry-utils", "thiserror 2.0.11", "tokio", "tokio-stream", diff --git a/lib/rebaser-server/BUCK b/lib/rebaser-server/BUCK index 074ce838aa..358fc81d99 100644 --- a/lib/rebaser-server/BUCK +++ b/lib/rebaser-server/BUCK @@ -19,6 +19,7 @@ rust_library( "//lib/si-settings:si-settings", "//lib/si-std:si-std", "//lib/telemetry-nats-rs:telemetry-nats", + "//lib/telemetry-utils-rs:telemetry-utils", "//lib/telemetry-rs:telemetry", "//lib/veritech-client:veritech-client", "//third-party/rust:derive_builder", diff --git a/lib/rebaser-server/Cargo.toml b/lib/rebaser-server/Cargo.toml index 4f8861d077..dd6fa2fd83 100644 --- a/lib/rebaser-server/Cargo.toml +++ b/lib/rebaser-server/Cargo.toml @@ -26,6 +26,7 @@ si-settings = { path = "../../lib/si-settings" } si-std = { path = "../../lib/si-std" } telemetry = { path = "../../lib/telemetry-rs" } telemetry-nats = { path = "../../lib/telemetry-nats-rs" } +telemetry-utils = { path = "../../lib/telemetry-utils-rs" } veritech-client = { path = "../../lib/veritech-client" } derive_builder = { workspace = true } diff --git a/lib/rebaser-server/src/change_set_processor_task.rs b/lib/rebaser-server/src/change_set_processor_task.rs index b959443a80..2a56d6677a 100644 --- a/lib/rebaser-server/src/change_set_processor_task.rs +++ b/lib/rebaser-server/src/change_set_processor_task.rs @@ -24,6 +24,7 @@ use si_data_nats::{ }; use si_events::{ChangeSetId, WorkspacePk}; use telemetry::prelude::*; +use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Notify; use tokio_stream::StreamExt as _; @@ -49,7 +50,6 @@ pub(crate) struct ChangeSetProcessorTask { workspace_id: WorkspacePk, change_set_id: ChangeSetId, inner: Box> + Unpin + Send>, - quiescence_token: CancellationToken, } impl ChangeSetProcessorTask { @@ -64,8 +64,10 @@ impl ChangeSetProcessorTask { workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, - run_notify: Arc, + run_dvu_notify: Arc, quiescent_period: Duration, + quiesced_notify: Arc, + quiesced_token: CancellationToken, task_token: CancellationToken, server_tracker: TaskTracker, ) -> Self { @@ -78,44 +80,41 @@ impl ChangeSetProcessorTask { change_set_id, nats, ctx_builder, - run_notify, + run_dvu_notify, server_tracker, ); - let quiescence_token = CancellationToken::new(); - let captured = QuiescedCaptured { instance_id: metadata.instance_id().to_string(), workspace_id, change_set_id, - quiescence_token: quiescence_token.clone(), + quiesced_notify: quiesced_notify.clone(), }; - let inactive_aware_incoming = incoming // Looks for a gap between incoming messages greater than the duration .timeout(quiescent_period) - // Fire the quiescence token which triggers a distinctive shutdown where we *know* we - // want to remove the task from the set of work. + // Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where + // we *know* we want to remove the task from the set of work. .inspect_err(move |_elapsed| { let QuiescedCaptured { instance_id, workspace_id, change_set_id, - quiescence_token, + quiesced_notify, } = &captured; - - debug!( + info!( service.instance.id = instance_id, si.workspace.id = %workspace_id, si.change_set.id = %change_set_id, - "rate of requests has become inactive, shutting down processing tasks", + "rate of requests has become inactive, triggering a quiesced shutdown", ); - quiescence_token.cancel(); + // Notify the serial dvu task that we want to shutdown due to a quiet period + quiesced_notify.notify_one(); }) - // Once the first inactive period is detected, this stream is closed (i.e. returns - // `None`) - .map_while(result::Result::ok) - .fuse(); + // Continue processing messages as normal until the Naxum app's graceful shutdown is + // triggered. This means we turn the stream back from a stream of + // `Result, Elapsed>` into `Result` + .filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok()); let app = ServiceBuilder::new() .layer( @@ -135,10 +134,7 @@ impl ChangeSetProcessorTask { let inner = naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1) - .with_graceful_shutdown(graceful_shutdown_signal( - task_token, - quiescence_token.clone(), - )); + .with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token)); let inner_fut = inner.into_future(); @@ -147,44 +143,28 @@ impl ChangeSetProcessorTask { workspace_id, change_set_id, inner: Box::new(inner_fut), - quiescence_token, } } - pub(crate) async fn try_run(self) -> Result { + pub(crate) async fn try_run(self) -> Result<()> { self.inner.await.map_err(Error::Naxum)?; + metric!(counter.change_set_processor_task.change_set_task = -1); - if self.quiescence_token.is_cancelled() { - debug!( - task = Self::NAME, - si.workspace.id = %self.workspace_id, - si.change_set.id = %self.change_set_id, - "shutdown due to quiescent period", - ); - Ok(Shutdown::Quiesced) - } else { - debug!( - task = Self::NAME, - si.workspace.id = %self.workspace_id, - si.change_set.id = %self.change_set_id, - "shutdown complete", - ); - Ok(Shutdown::Graceful) - } + info!( + task = Self::NAME, + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "shutdown complete", + ); + Ok(()) } } -#[derive(Debug)] -pub(crate) enum Shutdown { - Graceful, - Quiesced, -} - struct QuiescedCaptured { instance_id: String, workspace_id: WorkspacePk, change_set_id: ChangeSetId, - quiescence_token: CancellationToken, + quiesced_notify: Arc, } #[derive(Clone, Debug)] @@ -207,7 +187,7 @@ impl post_process::OnSuccess for DeleteMessageOnSuccess { let stream = self.stream.clone(); Box::pin(async move { - trace!("deleting message on success"); + info!("deleting message on success"); if let Err(err) = stream.delete_message(info.stream_sequence).await { warn!( si.error.message = ?err, @@ -305,6 +285,7 @@ mod handlers { use si_data_nats::HeaderMap; use telemetry::prelude::*; use telemetry_nats::propagation; + use telemetry_utils::metric; use thiserror::Error; use crate::{ @@ -349,6 +330,7 @@ mod handlers { impl IntoResponse for HandlerError { fn into_response(self) -> Response { + metric!(counter.change_set_processor_task.failed_rebase = 1); // TODO(fnichol): there are different responses, esp. for expected interrupted error!(si.error.message = ?self, "failed to process message"); Response::default_internal_server_error() diff --git a/lib/rebaser-server/src/handlers.rs b/lib/rebaser-server/src/handlers.rs index 4cee08577e..b63ebe5d07 100644 --- a/lib/rebaser-server/src/handlers.rs +++ b/lib/rebaser-server/src/handlers.rs @@ -21,8 +21,9 @@ use ulid::Ulid; use crate::{ app_state::AppState, - change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError, Shutdown}, + change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError}, serial_dvu_task::{SerialDvuTask, SerialDvuTaskError}, + Shutdown, }; const CONSUMER_NAME_PREFIX: &str = "rebaser-requests"; @@ -105,7 +106,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // We want to indendently control the lifecyle of our tasks let tasks_token = CancellationToken::new(); - let run_notify = Arc::new(Notify::new()); + let run_dvu_notify = Arc::new(Notify::new()); + let quiesced_token = CancellationToken::new(); + let quiesced_notify = Arc::new(Notify::new()); let incoming = requests_stream .create_consumer(rebaser_requests_per_change_set_consumer_config( @@ -126,7 +129,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> workspace.id, change_set.id, ctx_builder.clone(), - run_notify.clone(), + run_dvu_notify.clone(), + quiesced_notify.clone(), + quiesced_token.clone(), tasks_token.clone(), ); @@ -138,8 +143,10 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> workspace.id, change_set.id, ctx_builder, - run_notify, + run_dvu_notify, quiescent_period, + quiesced_notify, + quiesced_token, tasks_token.clone(), server_tracker, ); @@ -166,11 +173,9 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // Processor task completed processor_task_result_result = processor_task_result => { match processor_task_result_result { - // A quiet period was found in the stream; reply `Ok` to ack and remove this task - Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry - Ok(Ok(Shutdown::Graceful)) => Err(Error::ChangeSetProcessorCompleted), + Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted), // Processor exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)), // Tokio join error on processor exit; reply `Err` to nack for task to persist and @@ -181,9 +186,11 @@ pub(crate) async fn default(State(state): State, subject: Subject) -> // Serial dvu task completed dvu_task_result_result = dvu_task_result => { match dvu_task_result_result { + // A quiet period was found in the stream; reply Ok to ack and remove this task + Ok(Ok(Shutdown::Quiesced)) => Ok(()), // Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to // persist and retry - Ok(Ok(())) => Err(Error::SerialDvuCompleted), + Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted), // Serial dvu exited with error; reply `Err` to nack for task to persist and retry Ok(Err(err)) => Err(Error::SerialDvu(err)), // Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and diff --git a/lib/rebaser-server/src/lib.rs b/lib/rebaser-server/src/lib.rs index c756a131c9..e6f81ce2fe 100644 --- a/lib/rebaser-server/src/lib.rs +++ b/lib/rebaser-server/src/lib.rs @@ -102,3 +102,9 @@ impl ServerError { type Error = ServerError; type Result = std::result::Result; + +#[derive(Debug)] +pub(crate) enum Shutdown { + Graceful, + Quiesced, +} diff --git a/lib/rebaser-server/src/serial_dvu_task.rs b/lib/rebaser-server/src/serial_dvu_task.rs index 1e37ce5ff3..1283bb684f 100644 --- a/lib/rebaser-server/src/serial_dvu_task.rs +++ b/lib/rebaser-server/src/serial_dvu_task.rs @@ -3,11 +3,12 @@ use std::{result, sync::Arc}; use dal::DalContextBuilder; use si_events::{ChangeSetId, WorkspacePk}; use telemetry::prelude::*; +use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; -use crate::ServerMetadata; +use crate::{ServerMetadata, Shutdown}; #[remain::sorted] #[derive(Debug, Error)] @@ -15,6 +16,9 @@ pub(crate) enum SerialDvuTaskError { /// Error when using a DAL context #[error("dal context transaction error: {0}")] DalContext(#[from] dal::TransactionsError), + /// When failing to do an operation using the [`WorkspaceSnapshot`] + #[error("workspace snapshot error: {0}")] + WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError), } type Result = result::Result; @@ -25,18 +29,23 @@ pub(crate) struct SerialDvuTask { change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, run_notify: Arc, + quiesced_notify: Arc, + quiesced_token: CancellationToken, token: CancellationToken, } impl SerialDvuTask { const NAME: &'static str = "rebaser_server::serial_dvu_task"; + #[allow(clippy::too_many_arguments)] pub(crate) fn create( metadata: Arc, workspace_id: WorkspacePk, change_set_id: ChangeSetId, ctx_builder: DalContextBuilder, run_notify: Arc, + quiesced_notify: Arc, + quiesced_token: CancellationToken, token: CancellationToken, ) -> Self { Self { @@ -45,18 +54,26 @@ impl SerialDvuTask { change_set_id, ctx_builder, run_notify, + quiesced_notify, + quiesced_token, token, } } - pub(crate) async fn try_run(self) -> Result<()> { - loop { + pub(crate) async fn try_run(self) -> Result { + metric!(counter.serial_dvu_task.change_set_in_progress = 1); + + // Attempt to run an initial dvu in case there are processed requests that haven't yet been + // finished with a dvu run + self.maybe_run_initial_dvu().await?; + + let shutdown_cause = loop { tokio::select! { biased; // Signal to run a DVU has fired _ = self.run_notify.notified() => { - debug!( + info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, @@ -65,28 +82,47 @@ impl SerialDvuTask { ); self.run_dvu().await?; } + // Signal to shutdown from a quiet period has fired + _ = self.quiesced_notify.notified() => { + info!( + task = Self::NAME, + service.instance.id = self.metadata.instance_id(), + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "quiesced notified, starting to shut down", + ); + + // Fire the quiesced_token so that the processing task immediately stops + // processing additional requests + self.quiesced_token.cancel(); + + break Shutdown::Quiesced; + } // Cancellation token has fired, time to shut down _ = self.token.cancelled() => { - debug!( + info!( task = Self::NAME, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, - "received cancellation", + "received cancellation, shutting down", ); - break; + break Shutdown::Graceful; } } - } + }; - debug!( + info!( task = Self::NAME, + cause = ?shutdown_cause, service.instance.id = self.metadata.instance_id(), si.workspace.id = %self.workspace_id, si.change_set.id = %self.change_set_id, "shutdown complete", ); - Ok(()) + metric!(counter.serial_dvu_task.change_set_in_progress = -1); + + Ok(shutdown_cause) } #[instrument( @@ -100,6 +136,8 @@ impl SerialDvuTask { ), )] async fn run_dvu(&self) -> Result<()> { + metric!(counter.serial_dvu_task.dvu_running = 1); + let builder = self.ctx_builder.clone(); let ctx = builder .build_for_change_set_as_system(self.workspace_id, self.change_set_id, None) @@ -107,6 +145,35 @@ impl SerialDvuTask { ctx.enqueue_dependent_values_update().await?; ctx.blocking_commit_no_rebase().await?; + metric!(counter.serial_dvu_task.dvu_running = -1); + + Ok(()) + } + + async fn maybe_run_initial_dvu(&self) -> Result<()> { + let builder = self.ctx_builder.clone(); + let ctx = builder + .build_for_change_set_as_system(self.workspace_id, self.change_set_id, None) + .await?; + + if ctx + .workspace_snapshot()? + .has_dependent_value_roots() + .await? + { + metric!(counter.serial_dvu_task.initial_dvu_running = 1); + + info!( + task = Self::NAME, + service.instance.id = self.metadata.instance_id(), + si.workspace.id = %self.workspace_id, + si.change_set.id = %self.change_set_id, + "enqueuing *initial* dependent_values_update", + ); + ctx.enqueue_dependent_values_update().await?; + ctx.blocking_commit_no_rebase().await?; + metric!(counter.serial_dvu_task.initial_dvu_running = -1); + } Ok(()) }