Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Additional prop tests for GC. #3883

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ chroma-cache = { workspace = true }
[dev-dependencies]
proptest = { workspace = true }
rand = { workspace = true }
tokio-test = "0.4"
228 changes: 209 additions & 19 deletions rust/garbage_collector/src/operators/compute_versions_to_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use chroma_system::{Operator, OperatorType};
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
use chrono::{DateTime, Utc};
use rand::Rng;

Check failure on line 6 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Rust tests / can-build-release

unresolved import `rand`

Check failure on line 6 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Lint

unresolved import `rand`

Check failure on line 6 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Rust tests / test-benches (depot-ubuntu-22.04-16)

unresolved import `rand`

Check failure on line 6 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Rust tests / test (depot-ubuntu-22.04)

unresolved import `rand`

Check failure on line 6 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Rust tests / test-integration (depot-ubuntu-22.04-16)

unresolved import `rand`
use thiserror::Error;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -52,8 +53,6 @@
&self,
input: &ComputeVersionsToDeleteInput,
) -> Result<ComputeVersionsToDeleteOutput, ComputeVersionsToDeleteError> {
tracing::info!("Starting compute versions to delete");

let mut version_file = input.version_file.clone();
let collection_info = version_file
.collection_info_immutable
Expand All @@ -63,54 +62,41 @@
ComputeVersionsToDeleteError::ComputeError("Missing collection info".to_string())
})?;

tracing::info!(
tracing::debug!(
tenant = %collection_info.tenant_id,
database = %collection_info.database_id,
collection = %collection_info.collection_id,
"Processing collection"
"Processing collection to compute versions to delete"
);

let mut marked_versions = Vec::new();
let mut oldest_version_to_keep = 0;

if let Some(ref mut version_history) = version_file.version_history {
tracing::info!(
tracing::debug!(
"Processing {} versions in history",
version_history.versions.len()
);

let mut unique_versions_seen = 0;
let mut last_version = None;
let mut oldest_version_min_criteria = None;

// First pass: find the oldest version that must be kept
for version in version_history.versions.iter().rev() {
if last_version != Some(version.version) {
unique_versions_seen += 1;
oldest_version_min_criteria = Some(version.version);
oldest_version_to_keep = version.version;
tracing::debug!(
version = version.version,
unique_versions = unique_versions_seen,
"Processing version"
);
if unique_versions_seen == input.min_versions_to_keep {
break;
}
last_version = Some(version.version);
}
}

tracing::info!(
oldest_version = ?oldest_version_min_criteria,
min_versions = input.min_versions_to_keep,
"Found oldest version to keep"
);

// Second pass: mark for deletion if older than oldest_kept AND before cutoff
for version in version_history.versions.iter_mut() {
if version.version != 0
&& version.version < oldest_version_min_criteria.unwrap_or(i64::MAX)
&& version.version < oldest_version_to_keep
&& version.created_at_secs < input.cutoff_time.timestamp()
{
tracing::info!(
Expand Down Expand Up @@ -155,6 +141,8 @@
CollectionVersionInfo,
};
use chrono::{Duration, Utc};
use proptest::collection::vec;

Check failure on line 144 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Lint

unused import: `proptest::collection::vec`

Check failure on line 144 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Rust tests / test-benches (depot-ubuntu-22.04-16)

unused import: `proptest::collection::vec`

Check failure on line 144 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Rust tests / test-integration (depot-ubuntu-22.04-16)

unused import: `proptest::collection::vec`
use proptest::prelude::*;

#[tokio::test]
async fn test_compute_versions_to_delete() {
Expand Down Expand Up @@ -216,4 +204,206 @@
assert!(!versions[2].marked_for_deletion); // Version 2 should be kept
assert!(!versions[3].marked_for_deletion); // Version 3 should be kept
}

/// Helper function to create a version file with given versions
fn create_version_file(versions: Vec<CollectionVersionInfo>) -> CollectionVersionFile {
CollectionVersionFile {
version_history: Some(CollectionVersionHistory { versions }),
collection_info_immutable: Some(CollectionInfoImmutable {
tenant_id: "test_tenant".to_string(),
database_id: "test_db".to_string(),
collection_id: "test_collection".to_string(),
dimension: 0,
..Default::default()
}),
}
}

/// Helper function to create version info with guaranteed ordering
fn create_ordered_version_infos(
versions: Vec<i64>,
base_time: DateTime<Utc>,
) -> Vec<CollectionVersionInfo> {
let mut rng = rand::thread_rng();

// First sort by version number
let mut version_infos = versions;
version_infos.sort();

// Start from the most recent version (highest version number)
// and work backwards, ensuring each previous version has an earlier timestamp
let mut timestamps: Vec<DateTime<Utc>> = Vec::with_capacity(version_infos.len());
let mut current_time = base_time;

for _ in version_infos.iter().rev() {
timestamps.push(current_time);
// Generate a random time difference between 1 minute and 1 hour
let minutes_diff = rng.gen_range(1..=60);
current_time = current_time - Duration::minutes(minutes_diff);

Check failure on line 242 in rust/garbage_collector/src/operators/compute_versions_to_delete.rs

View workflow job for this annotation

GitHub Actions / Lint

manual implementation of an assign operation
}
timestamps.reverse(); // Reverse to match version order

// Create the final version infos
version_infos
.into_iter()
.zip(timestamps)
.map(|(version, timestamp)| CollectionVersionInfo {
version,
created_at_secs: timestamp.timestamp(),
marked_for_deletion: false,
..Default::default()
})
.collect()
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]

#[test]
fn prop_always_keeps_minimum_versions(
min_versions_to_keep in 1u32..10u32,
additional_versions in 0u32..90u32,
cutoff_hours in 1i64..168i64
) {
let total_versions = min_versions_to_keep + additional_versions;
let versions: Vec<i64> = (1..=total_versions as i64).collect();

let now = Utc::now();
let version_infos = create_ordered_version_infos(versions, now);
let version_file = create_version_file(version_infos);
let input = ComputeVersionsToDeleteInput {
version_file,
cutoff_time: now - Duration::hours(cutoff_hours),
min_versions_to_keep,
};

let operator = ComputeVersionsToDeleteOperator {};
let result = tokio_test::block_on(operator.run(&input)).unwrap();

// Count unique versions that are not marked for deletion
let versions = &result.version_file.version_history.unwrap().versions;
let mut unique_kept_versions = versions.iter()
.filter(|v| !v.marked_for_deletion)
.map(|v| v.version)
.collect::<Vec<_>>();
unique_kept_versions.sort();
unique_kept_versions.dedup();

prop_assert!(
unique_kept_versions.len() >= min_versions_to_keep as usize,
"Expected at least {} versions to be kept, but only {} were kept",
min_versions_to_keep,
unique_kept_versions.len()
);
}

#[test]
fn prop_respects_cutoff_time(
min_versions_to_keep in 1u32..10u32,
additional_versions in 0u32..90u32,
cutoff_hours in 1i64..168i64
) {
let total_versions = min_versions_to_keep + additional_versions;
let versions: Vec<i64> = (1..=total_versions as i64).collect();

let now = Utc::now();
let version_infos = create_ordered_version_infos(versions, now);
let version_file = create_version_file(version_infos);
let cutoff_time = now - Duration::hours(cutoff_hours);
let input = ComputeVersionsToDeleteInput {
version_file,
cutoff_time,
min_versions_to_keep,
};

let operator = ComputeVersionsToDeleteOperator {};
let result = tokio_test::block_on(operator.run(&input)).unwrap();

// Verify no versions newer than cutoff_time are marked for deletion
let versions = &result.version_file.version_history.unwrap().versions;
for version in versions {
if version.created_at_secs >= cutoff_time.timestamp() {
prop_assert!(!version.marked_for_deletion,
"Version {} created at {} should not be marked for deletion as it's newer than cutoff {}",
version.version,
version.created_at_secs,
cutoff_time.timestamp()
);
}
}
}

#[test]
fn prop_version_zero_never_deleted(
min_versions_to_keep in 1u32..10u32,
additional_versions in 0u32..90u32,
cutoff_hours in 1i64..168i64
) {
let total_versions = min_versions_to_keep + additional_versions;
// Start from 0 for this test to include version 0
let versions: Vec<i64> = (0..=total_versions as i64).collect();

let now = Utc::now();
let version_infos = create_ordered_version_infos(versions, now);
let version_file = create_version_file(version_infos);
let input = ComputeVersionsToDeleteInput {
version_file,
cutoff_time: now - Duration::hours(cutoff_hours),
min_versions_to_keep,
};

let operator = ComputeVersionsToDeleteOperator {};
let result = tokio_test::block_on(operator.run(&input)).unwrap();

// Verify version 0 is never marked for deletion
let versions = &result.version_file.version_history.unwrap().versions;
for version in versions {
if version.version == 0 {
prop_assert!(!version.marked_for_deletion,
"Version 0 should never be marked for deletion"
);
}
}
}

#[test]
fn prop_versions_are_chronologically_ordered(
min_versions_to_keep in 1u32..10u32,
additional_versions in 0u32..90u32,
cutoff_hours in 1i64..168i64
) {
let total_versions = min_versions_to_keep + additional_versions;
let versions: Vec<i64> = (1..=total_versions as i64).collect();

let now = Utc::now();
let version_infos = create_ordered_version_infos(versions, now);

// Verify that higher version numbers have later timestamps
let version_file = create_version_file(version_infos);
let versions = &version_file.version_history.as_ref().unwrap().versions;

for window in versions.windows(2) {
if window[0].version < window[1].version {
prop_assert!(
window[0].created_at_secs <= window[1].created_at_secs,
"Version {} (timestamp {}) should have earlier or equal timestamp than version {} (timestamp {})",
window[0].version,
window[0].created_at_secs,
window[1].version,
window[1].created_at_secs
);
}
}

// Run the operator to ensure it works correctly with ordered versions
let input = ComputeVersionsToDeleteInput {
version_file,
cutoff_time: now - Duration::hours(cutoff_hours),
min_versions_to_keep,
};

let operator = ComputeVersionsToDeleteOperator {};
let _ = tokio_test::block_on(operator.run(&input)).unwrap();
}
}
}
Loading
Loading