Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "migration-mode" to remove workspace snapshots unused by any active change sets #5523

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion bin/sdf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{path::PathBuf, time::Duration};

use sdf_server::{key_generation, Config, Migrator, Server};
use sdf_server::{key_generation, Config, Migrator, Server, SnapshotGarbageCollector};
use si_service::{
color_eyre,
prelude::*,
Expand Down Expand Up @@ -103,6 +103,18 @@ async fn async_main() -> Result<()> {
telemetry_shutdown,
)
.await
} else if config.migration_mode().is_garbage_collect_snapshots() {
garbage_collect_snapshots(
config,
main_tracker,
main_token,
helping_tasks_tracker,
helping_tasks_token,
telemetry_tracker,
telemetry_token,
telemetry_shutdown,
)
.await
} else {
run_server(
config,
Expand Down Expand Up @@ -194,6 +206,35 @@ async fn migrate_and_quit(
.map_err(Into::into)
}

#[inline]
#[allow(clippy::too_many_arguments)]
async fn garbage_collect_snapshots(
config: Config,
main_tracker: TaskTracker,
main_token: CancellationToken,
helping_tasks_tracker: TaskTracker,
helping_tasks_token: CancellationToken,
telemetry_tracker: TaskTracker,
telemetry_token: CancellationToken,
telemetry_shutdown: TelemetryShutdownGuard,
) -> Result<()> {
let garbage_collector =
SnapshotGarbageCollector::new(config, &helping_tasks_tracker, helping_tasks_token.clone())
.await?;

let handle = main_tracker.spawn(garbage_collector.garbage_collect_snapshots());

shutdown::graceful_with_handle(handle)
.group(main_tracker, main_token)
.group(helping_tasks_tracker, helping_tasks_token)
.group(telemetry_tracker, telemetry_token)
.telemetry_guard(telemetry_shutdown.into_future())
.timeout(GRACEFUL_SHUTDOWN_TIMEOUT)
.wait()
.await
.map_err(Into::into)
}

#[inline]
async fn generate_veritech_key_pair(
secret_key_path: PathBuf,
Expand Down
15 changes: 13 additions & 2 deletions lib/dal/src/change_set/status.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use serde::{Deserialize, Serialize};
use si_data_pg::postgres_types::ToSql;
use strum::{AsRefStr, Display, EnumString};
use strum::{AsRefStr, Display, EnumIter, EnumString};

// NOTE(nick): if we can remove the "ToSql" trait, then we can fully move this to "si-events-rs"
// and delete the duplicate types.
#[remain::sorted]
#[derive(
AsRefStr, Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Copy, Clone, ToSql,
AsRefStr,
Deserialize,
Serialize,
Debug,
Display,
EnumString,
PartialEq,
Eq,
Copy,
Clone,
ToSql,
EnumIter,
)]
pub enum ChangeSetStatus {
/// No longer usable
Expand Down
13 changes: 13 additions & 0 deletions lib/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub fn generate_name() -> String {
)]
#[strum(serialize_all = "camelCase")]
pub enum MigrationMode {
GarbageCollectSnapshots,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this!

Run,
RunAndQuit,
Skip,
Expand All @@ -280,6 +281,10 @@ impl MigrationMode {
pub fn is_run_and_quit(&self) -> bool {
matches!(self, Self::RunAndQuit)
}

pub fn is_garbage_collect_snapshots(&self) -> bool {
matches!(self, Self::GarbageCollectSnapshots)
}
}

#[cfg(test)]
Expand All @@ -293,13 +298,21 @@ mod tests {

#[test]
fn display() {
assert_eq!(
"garbageCollectSnapshots",
MigrationMode::GarbageCollectSnapshots.to_string()
);
assert_eq!("run", MigrationMode::Run.to_string());
assert_eq!("runAndQuit", MigrationMode::RunAndQuit.to_string());
assert_eq!("skip", MigrationMode::Skip.to_string());
}

#[test]
fn from_str() {
assert_eq!(
MigrationMode::GarbageCollectSnapshots,
"garbageCollectSnapshots".parse().expect("failed to parse")
);
assert_eq!(MigrationMode::Run, "run".parse().expect("failed to parse"));
assert_eq!(
MigrationMode::RunAndQuit,
Expand Down
159 changes: 159 additions & 0 deletions lib/sdf-server/src/garbage_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::{collections::HashSet, future::IntoFuture as _};

use dal::{ChangeSetStatus, ServicesContext, TransactionsError, WorkspaceSnapshotAddress};
use si_data_pg::{PgError, PgPoolError};
use si_layer_cache::LayerDbError;
use strum::IntoEnumIterator;
use telemetry::prelude::*;
use thiserror::Error;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

use crate::{init, Config};

#[remain::sorted]
#[derive(Debug, Error)]
pub enum GarbageCollectorError {
#[error("error while initializing: {0}")]
Init(#[from] init::InitError),
#[error("Layer DB error: {0}")]
LayerDb(#[from] LayerDbError),
#[error("Pg error: {0}")]
Pg(#[from] PgError),
#[error("Pg Pool error: {0}")]
PgPool(#[from] PgPoolError),
#[error("Transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("Unable to query workspace snapshots")]
UnableToQuerySnapshots,
}

type Result<T> = std::result::Result<T, GarbageCollectorError>;

pub struct SnapshotGarbageCollector {
services_context: ServicesContext,
}

impl SnapshotGarbageCollector {
#[instrument(name = "sdf.snapshot_garbage_collector.new", level = "info", skip_all)]
pub async fn new(
config: Config,
task_tracker: &TaskTracker,
task_token: CancellationToken,
) -> Result<Self> {
let (services_context, layer_db_graceful_shutdown) =
init::services_context_from_config(&config, task_token).await?;

task_tracker.spawn(layer_db_graceful_shutdown.into_future());

Ok(Self { services_context })
}

#[instrument(
name = "sdf.snapshot_garbage_collector.garbage_collect_snapshots",
level = "info",
skip_all
)]
pub async fn garbage_collect_snapshots(self) -> Result<()> {
let span = current_span_for_instrument_at!("info");

let dal_context = self.services_context.clone().into_builder(true);
let ctx = dal_context
.build_default(None)
.await
.map_err(|err| span.record_err(err))?;
let ctx = &ctx;

let mut open_change_set_snapshot_ids = HashSet::new();
let mut all_snapshot_ids = HashSet::new();

// Gather the WorkspaceSnapshotAddress of all open change sets.
let open_statuses: Vec<String> = ChangeSetStatus::iter()
.filter_map(|status| {
if status.is_active() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this isn't safe if there is active work going on in the system? i.e. would we have to ensure that there are no changesets being created/applied/deleted while this mode is being executed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aware there's a 1h window below, but I'm wondering if this takes over an hour whether it's still possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The danger would be:

  • Snapshot X created more than an hour ago.
  • Snapshot X is not used by any "active" change sets at the time we query the change_set_pointers table.
  • We query change_set_pointers to gather the snapshot addresses.
  • Change Set B is created/modified after we queried, is "active", and is now referencing Snapshot X.
  • The query of workspace_snapshots to gather the snapshot addresses could happen either before, or after Change Set B is created/modified to reference Snapshot X.

I think it's technically possible, but I think the possibility of it happening without specifically trying is pretty small.

Some(status.to_string())
} else {
None
}
})
.collect();
info!(
"Change set status(es) to consider 'active': {:?}",
&open_statuses
);
let change_set_snapshot_rows = ctx.txns().await?.pg().query(
"SELECT workspace_snapshot_address AS snapshot_id FROM change_set_pointers WHERE status = ANY($1::text[]) GROUP BY workspace_snapshot_address",
&[&open_statuses],
).await?;
for row in change_set_snapshot_rows {
let snapshot_id: WorkspaceSnapshotAddress = row.try_get("snapshot_id")?;
open_change_set_snapshot_ids.insert(snapshot_id);
}
info!(
"Found {} distinct snapshot address(es) for open change sets.",
open_change_set_snapshot_ids.len()
);

// Gather the WorkspaceSnapshotAddress of all existing snapshots that are
// at least an hour old.
//
// By only considering the snapshots that are at least an hour old, we avoid
// race conditions where a change set is created, or modified between when we
// queried the change_set_pointers table and the workspace_snapshots table.
// We can't rely on transactional integrity to avoid race conditions as the
// tables are in completely separate databases.
let snapshot_id_rows = ctx
.layer_db()
.workspace_snapshot()
.cache
.pg()
.query(
"SELECT key AS snapshot_id
FROM workspace_snapshots
WHERE created_at < NOW() - '1 hour'::interval
GROUP BY key",
&[],
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that this is running a SQL query on a pg client (out of the pool) in a non-transaction query. No other side effects!

.await?
.ok_or_else(|| GarbageCollectorError::UnableToQuerySnapshots)?;
for row in snapshot_id_rows {
let snapshot_id: WorkspaceSnapshotAddress = row.try_get("snapshot_id")?;
all_snapshot_ids.insert(snapshot_id);
}
info!(
"Found {} distinct snapshot address(es) older than cutoff.",
all_snapshot_ids.len()
);

// Any WorkspaceSnapshotAddress not in both open_change_set_snapshot_ids
// and all_snapshot_ids is for a closed/applied/abandoned change set and
// can be deleted, as we do not allow re-opening change sets.
let snapshot_ids_to_delete: HashSet<_> = all_snapshot_ids
.difference(&open_change_set_snapshot_ids)
.collect();
info!(
"Found {} snapshot address(es) only used by inactive change sets.",
snapshot_ids_to_delete.len()
);

let mut counter = 0;
for key in snapshot_ids_to_delete.iter().take(10_000) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Briefly had this as a LIMIT 10000 in the SQL query, but realized that had the potential to limit the rows returned to only those referenced by active change sets while there might be candidates for deletion outside of the LIMIT. By bounding the number of items we process after having fully calculated the list, we should avoid the problem of potentially stalling out while there is work that could be done.

ctx.layer_db()
.workspace_snapshot()
.cache
.pg()
.delete(&key.to_string())
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delete here is also a transaction-less delete query to the dataase


counter += 1;
if counter % 100 == 0 {
info!("Deleted {} snapshot addresses.", counter);
}
}
info!("Deleted {} snapshot address(es).", counter);

ctx.commit().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't have any interaction with the layer cache's database, only for the DAL db and any pending WS events.


span.record_ok();
Ok(())
}
}
2 changes: 2 additions & 0 deletions lib/sdf-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod app_state;
mod config;
pub mod dal_wrapper;
mod extract;
mod garbage_collection;
mod init;
pub mod key_generation;
pub mod middleware;
Expand All @@ -42,6 +43,7 @@ pub use self::{
Config, ConfigBuilder, ConfigError, ConfigFile, IncomingStream, MigrationMode,
StandardConfig, StandardConfigFile, WorkspacePermissions, WorkspacePermissionsMode,
},
garbage_collection::SnapshotGarbageCollector,
migrations::Migrator,
nats_multiplexer::CRDT_MULTIPLEXER_SUBJECT,
server::{Server, ServerMetadata, ServerSocket},
Expand Down