Skip to content

Commit

Permalink
Add "migration-mode" to remove workspace snapshots unused by any acti…
Browse files Browse the repository at this point in the history
…ve change sets

While we do evict snapshots when a change set pointer moves, we don't do
anything when a change set's status is no longer one of the "active"
statuses. We also do not currently allow setting the status of a change
set back to one of the "active" statuses once it has been moved out of
being "active".

Because a change set is forever "inactive" once it has become inactive,
it is not necessary to keep the snapshot around, unless it is also being
used by an "active" change set.

This new migration-mode (`garbageCollectSnapshots`) queries all "active"
change sets to gather their workspace snapshot addresses, and all of the
workspace snapshot addresses in the LayerDb. Any workspace snapshot that
is at least an hour old, and is _NOT_ referenced by an "active" change
set is removed. We only consider snapshots older than an hour to avoid
race conditions where a change set might have been created or modified
after we queried the `change_set_pointers` table, and before when we
query the `workspace_snapshots` table. Because the tables are in
separate databases, we can't rely on normal transactional integrity.
  • Loading branch information
jhelwig committed Feb 21, 2025
1 parent e8e3b1c commit 9d8b75f
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 3 deletions.
43 changes: 42 additions & 1 deletion bin/sdf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{path::PathBuf, time::Duration};

use sdf_server::{key_generation, Config, Migrator, Server};
use sdf_server::{key_generation, Config, Migrator, Server, SnapshotGarbageCollector};
use si_service::{
color_eyre,
prelude::*,
Expand Down Expand Up @@ -103,6 +103,18 @@ async fn async_main() -> Result<()> {
telemetry_shutdown,
)
.await
} else if config.migration_mode().is_garbage_collect_snapshots() {
garbage_collect_snapshots(
config,
main_tracker,
main_token,
helping_tasks_tracker,
helping_tasks_token,
telemetry_tracker,
telemetry_token,
telemetry_shutdown,
)
.await
} else {
run_server(
config,
Expand Down Expand Up @@ -194,6 +206,35 @@ async fn migrate_and_quit(
.map_err(Into::into)
}

#[inline]
#[allow(clippy::too_many_arguments)]
async fn garbage_collect_snapshots(
config: Config,
main_tracker: TaskTracker,
main_token: CancellationToken,
helping_tasks_tracker: TaskTracker,
helping_tasks_token: CancellationToken,
telemetry_tracker: TaskTracker,
telemetry_token: CancellationToken,
telemetry_shutdown: TelemetryShutdownGuard,
) -> Result<()> {
let garbage_collector =
SnapshotGarbageCollector::new(config, &helping_tasks_tracker, helping_tasks_token.clone())
.await?;

let handle = main_tracker.spawn(garbage_collector.garbage_collect_snapshots());

shutdown::graceful_with_handle(handle)
.group(main_tracker, main_token)
.group(helping_tasks_tracker, helping_tasks_token)
.group(telemetry_tracker, telemetry_token)
.telemetry_guard(telemetry_shutdown.into_future())
.timeout(GRACEFUL_SHUTDOWN_TIMEOUT)
.wait()
.await
.map_err(Into::into)
}

#[inline]
async fn generate_veritech_key_pair(
secret_key_path: PathBuf,
Expand Down
15 changes: 13 additions & 2 deletions lib/dal/src/change_set/status.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use serde::{Deserialize, Serialize};
use si_data_pg::postgres_types::ToSql;
use strum::{AsRefStr, Display, EnumString};
use strum::{AsRefStr, Display, EnumIter, EnumString};

// NOTE(nick): if we can remove the "ToSql" trait, then we can fully move this to "si-events-rs"
// and delete the duplicate types.
#[remain::sorted]
#[derive(
AsRefStr, Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Copy, Clone, ToSql,
AsRefStr,
Deserialize,
Serialize,
Debug,
Display,
EnumString,
PartialEq,
Eq,
Copy,
Clone,
ToSql,
EnumIter,
)]
pub enum ChangeSetStatus {
/// No longer usable
Expand Down
13 changes: 13 additions & 0 deletions lib/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub fn generate_name() -> String {
)]
#[strum(serialize_all = "camelCase")]
pub enum MigrationMode {
GarbageCollectSnapshots,
Run,
RunAndQuit,
Skip,
Expand All @@ -280,6 +281,10 @@ impl MigrationMode {
pub fn is_run_and_quit(&self) -> bool {
matches!(self, Self::RunAndQuit)
}

pub fn is_garbage_collect_snapshots(&self) -> bool {
matches!(self, Self::GarbageCollectSnapshots)
}
}

#[cfg(test)]
Expand All @@ -293,13 +298,21 @@ mod tests {

#[test]
fn display() {
assert_eq!(
"garbageCollectSnapshots",
MigrationMode::GarbageCollectSnapshots.to_string()
);
assert_eq!("run", MigrationMode::Run.to_string());
assert_eq!("runAndQuit", MigrationMode::RunAndQuit.to_string());
assert_eq!("skip", MigrationMode::Skip.to_string());
}

#[test]
fn from_str() {
assert_eq!(
MigrationMode::GarbageCollectSnapshots,
"garbageCollectSnapshots".parse().expect("failed to parse")
);
assert_eq!(MigrationMode::Run, "run".parse().expect("failed to parse"));
assert_eq!(
MigrationMode::RunAndQuit,
Expand Down
161 changes: 161 additions & 0 deletions lib/sdf-server/src/garbage_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::{collections::HashSet, future::IntoFuture as _};

use dal::{ChangeSetStatus, ServicesContext, TransactionsError, WorkspaceSnapshotAddress};
use si_data_pg::{PgError, PgPoolError};
use si_layer_cache::LayerDbError;
use strum::IntoEnumIterator;
use telemetry::prelude::*;
use thiserror::Error;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

use crate::{init, Config};

#[remain::sorted]
#[derive(Debug, Error)]
pub enum GarbageCollectorError {
#[error("error while initializing: {0}")]
Init(#[from] init::InitError),
#[error("Layer DB error: {0}")]
LayerDb(#[from] LayerDbError),
#[error("Pg error: {0}")]
Pg(#[from] PgError),
#[error("Pg Pool error: {0}")]
PgPool(#[from] PgPoolError),
#[error("Transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("Unable to query workspace snapshots")]
UnableToQuerySnapshots,
}

type Result<T> = std::result::Result<T, GarbageCollectorError>;

pub struct SnapshotGarbageCollector {
services_context: ServicesContext,
}

impl SnapshotGarbageCollector {
#[instrument(name = "sdf.snapshot_garbage_collector.new", level = "info", skip_all)]
pub async fn new(
config: Config,
task_tracker: &TaskTracker,
task_token: CancellationToken,
) -> Result<Self> {
let (services_context, layer_db_graceful_shutdown) =
init::services_context_from_config(&config, task_token).await?;

task_tracker.spawn(layer_db_graceful_shutdown.into_future());

Ok(Self { services_context })
}

#[instrument(
name = "sdf.snapshot_garbage_collector.garbage_collect_snapshots",
level = "info",
skip_all
)]
pub async fn garbage_collect_snapshots(self) -> Result<()> {
let span = current_span_for_instrument_at!("info");

let dal_context = self.services_context.clone().into_builder(true);
let ctx = dal_context
.build_default(None)
.await
.map_err(|err| span.record_err(err))?;
let ctx = &ctx;

let mut open_change_set_snapshot_ids = HashSet::new();
let mut all_snapshot_ids = HashSet::new();

// Gather the WorkspaceSnapshotAddress of all open change sets.
let open_statuses: Vec<String> = ChangeSetStatus::iter()
.filter_map(|status| {
if status.is_active() {
Some(status.to_string())
} else {
None
}
})
.collect();
info!(
"Change set status(es) to consider 'active': {:?}",
&open_statuses
);
let change_set_snapshot_rows = ctx.txns().await?.pg().query(
"SELECT workspace_snapshot_address AS snapshot_id FROM change_set_pointers WHERE status = ANY($1::text[]) GROUP BY workspace_snapshot_address",
&[&open_statuses],
).await?;
for row in change_set_snapshot_rows {
let snapshot_id: WorkspaceSnapshotAddress = row.try_get("snapshot_id")?;
open_change_set_snapshot_ids.insert(snapshot_id);
}
info!(
"Found {} distinct snapshot address(es) for open change sets.",
open_change_set_snapshot_ids.len()
);

// Gather the WorkspaceSnapshotAddress of all existing snapshots that are
// at least an hour old.
//
// By only considering the snapshots that are at least an hour old, we avoid
// race conditions where a change set is created, or modified between when we
// queried the change_set_pointers table and the workspace_snapshots table.
// We can't rely on transactional integrity to avoid race conditions as the
// tables are in completely separate databases.
let snapshot_id_rows = ctx
.layer_db()
.workspace_snapshot()
.cache
.pg()
.query(
"SELECT key AS snapshot_id
FROM workspace_snapshots
WHERE created_at < NOW() - '1 hour'::interval
GROUP BY key
ORDER BY created_at
LIMIT 10000",
&[],
)
.await?
.ok_or_else(|| GarbageCollectorError::UnableToQuerySnapshots)?;
for row in snapshot_id_rows {
let snapshot_id: WorkspaceSnapshotAddress = row.try_get("snapshot_id")?;
all_snapshot_ids.insert(snapshot_id);
}
info!(
"Found {} distinct snapshot address(es) older than cutoff.",
all_snapshot_ids.len()
);

// Any WorkspaceSnapshotAddress not in both open_change_set_snapshot_ids
// and all_snapshot_ids is for a closed/applied/abandoned change set and
// can be deleted, as we do not allow re-opening change sets.
let snapshot_ids_to_delete: HashSet<_> = all_snapshot_ids
.difference(&open_change_set_snapshot_ids)
.collect();
info!(
"Found {} snapshot address(es) only used by inactive change sets.",
snapshot_ids_to_delete.len()
);

let mut counter = 0;
for key in snapshot_ids_to_delete {
ctx.layer_db()
.workspace_snapshot()
.cache
.pg()
.delete(&key.to_string())
.await?;

counter += 1;
if counter % 100 == 0 {
info!("Deleted {} snapshot addresses.", counter);
}
}
info!("Deleted {} snapshot address(es).", counter);

ctx.commit().await?;

span.record_ok();
Ok(())
}
}
2 changes: 2 additions & 0 deletions lib/sdf-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod app_state;
mod config;
pub mod dal_wrapper;
mod extract;
mod garbage_collection;
mod init;
pub mod key_generation;
pub mod middleware;
Expand All @@ -42,6 +43,7 @@ pub use self::{
Config, ConfigBuilder, ConfigError, ConfigFile, IncomingStream, MigrationMode,
StandardConfig, StandardConfigFile, WorkspacePermissions, WorkspacePermissionsMode,
},
garbage_collection::SnapshotGarbageCollector,
migrations::Migrator,
nats_multiplexer::CRDT_MULTIPLEXER_SUBJECT,
server::{Server, ServerMetadata, ServerSocket},
Expand Down

0 comments on commit 9d8b75f

Please sign in to comment.