diff --git a/rust/garbage_collector/Cargo.toml b/rust/garbage_collector/Cargo.toml index 15302267ee0..d7f312b7921 100644 --- a/rust/garbage_collector/Cargo.toml +++ b/rust/garbage_collector/Cargo.toml @@ -52,3 +52,4 @@ chroma-cache = { workspace = true } [dev-dependencies] proptest = { workspace = true } rand = { workspace = true } +tokio-test = "0.4" diff --git a/rust/garbage_collector/src/operators/compute_versions_to_delete.rs b/rust/garbage_collector/src/operators/compute_versions_to_delete.rs index 3b7f140429b..f9d829d6ffd 100644 --- a/rust/garbage_collector/src/operators/compute_versions_to_delete.rs +++ b/rust/garbage_collector/src/operators/compute_versions_to_delete.rs @@ -3,6 +3,7 @@ use chroma_error::{ChromaError, ErrorCodes}; use chroma_system::{Operator, OperatorType}; use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection}; use chrono::{DateTime, Utc}; +use rand::Rng; use thiserror::Error; #[derive(Clone, Debug)] @@ -52,8 +53,6 @@ impl Operator &self, input: &ComputeVersionsToDeleteInput, ) -> Result { - tracing::info!("Starting compute versions to delete"); - let mut version_file = input.version_file.clone(); let collection_info = version_file .collection_info_immutable @@ -63,37 +62,30 @@ impl Operator ComputeVersionsToDeleteError::ComputeError("Missing collection info".to_string()) })?; - tracing::info!( + tracing::debug!( tenant = %collection_info.tenant_id, database = %collection_info.database_id, collection = %collection_info.collection_id, - "Processing collection" + "Processing collection to compute versions to delete" ); let mut marked_versions = Vec::new(); let mut oldest_version_to_keep = 0; if let Some(ref mut version_history) = version_file.version_history { - tracing::info!( + tracing::debug!( "Processing {} versions in history", version_history.versions.len() ); let mut unique_versions_seen = 0; let mut last_version = None; - let mut oldest_version_min_criteria = None; // First pass: find the oldest version that must be kept for version in version_history.versions.iter().rev() { if last_version != Some(version.version) { unique_versions_seen += 1; - oldest_version_min_criteria = Some(version.version); oldest_version_to_keep = version.version; - tracing::debug!( - version = version.version, - unique_versions = unique_versions_seen, - "Processing version" - ); if unique_versions_seen == input.min_versions_to_keep { break; } @@ -101,16 +93,10 @@ impl Operator } } - tracing::info!( - oldest_version = ?oldest_version_min_criteria, - min_versions = input.min_versions_to_keep, - "Found oldest version to keep" - ); - // Second pass: mark for deletion if older than oldest_kept AND before cutoff for version in version_history.versions.iter_mut() { if version.version != 0 - && version.version < oldest_version_min_criteria.unwrap_or(i64::MAX) + && version.version < oldest_version_to_keep && version.created_at_secs < input.cutoff_time.timestamp() { tracing::info!( @@ -155,6 +141,8 @@ mod tests { CollectionVersionInfo, }; use chrono::{Duration, Utc}; + use proptest::collection::vec; + use proptest::prelude::*; #[tokio::test] async fn test_compute_versions_to_delete() { @@ -216,4 +204,206 @@ mod tests { assert!(!versions[2].marked_for_deletion); // Version 2 should be kept assert!(!versions[3].marked_for_deletion); // Version 3 should be kept } + + /// Helper function to create a version file with given versions + fn create_version_file(versions: Vec) -> CollectionVersionFile { + CollectionVersionFile { + version_history: Some(CollectionVersionHistory { versions }), + collection_info_immutable: Some(CollectionInfoImmutable { + tenant_id: "test_tenant".to_string(), + database_id: "test_db".to_string(), + collection_id: "test_collection".to_string(), + dimension: 0, + ..Default::default() + }), + } + } + + /// Helper function to create version info with guaranteed ordering + fn create_ordered_version_infos( + versions: Vec, + base_time: DateTime, + ) -> Vec { + let mut rng = rand::thread_rng(); + + // First sort by version number + let mut version_infos = versions; + version_infos.sort(); + + // Start from the most recent version (highest version number) + // and work backwards, ensuring each previous version has an earlier timestamp + let mut timestamps: Vec> = Vec::with_capacity(version_infos.len()); + let mut current_time = base_time; + + for _ in version_infos.iter().rev() { + timestamps.push(current_time); + // Generate a random time difference between 1 minute and 1 hour + let minutes_diff = rng.gen_range(1..=60); + current_time = current_time - Duration::minutes(minutes_diff); + } + timestamps.reverse(); // Reverse to match version order + + // Create the final version infos + version_infos + .into_iter() + .zip(timestamps) + .map(|(version, timestamp)| CollectionVersionInfo { + version, + created_at_secs: timestamp.timestamp(), + marked_for_deletion: false, + ..Default::default() + }) + .collect() + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(1000))] + + #[test] + fn prop_always_keeps_minimum_versions( + min_versions_to_keep in 1u32..10u32, + additional_versions in 0u32..90u32, + cutoff_hours in 1i64..168i64 + ) { + let total_versions = min_versions_to_keep + additional_versions; + let versions: Vec = (1..=total_versions as i64).collect(); + + let now = Utc::now(); + let version_infos = create_ordered_version_infos(versions, now); + let version_file = create_version_file(version_infos); + let input = ComputeVersionsToDeleteInput { + version_file, + cutoff_time: now - Duration::hours(cutoff_hours), + min_versions_to_keep, + }; + + let operator = ComputeVersionsToDeleteOperator {}; + let result = tokio_test::block_on(operator.run(&input)).unwrap(); + + // Count unique versions that are not marked for deletion + let versions = &result.version_file.version_history.unwrap().versions; + let mut unique_kept_versions = versions.iter() + .filter(|v| !v.marked_for_deletion) + .map(|v| v.version) + .collect::>(); + unique_kept_versions.sort(); + unique_kept_versions.dedup(); + + prop_assert!( + unique_kept_versions.len() >= min_versions_to_keep as usize, + "Expected at least {} versions to be kept, but only {} were kept", + min_versions_to_keep, + unique_kept_versions.len() + ); + } + + #[test] + fn prop_respects_cutoff_time( + min_versions_to_keep in 1u32..10u32, + additional_versions in 0u32..90u32, + cutoff_hours in 1i64..168i64 + ) { + let total_versions = min_versions_to_keep + additional_versions; + let versions: Vec = (1..=total_versions as i64).collect(); + + let now = Utc::now(); + let version_infos = create_ordered_version_infos(versions, now); + let version_file = create_version_file(version_infos); + let cutoff_time = now - Duration::hours(cutoff_hours); + let input = ComputeVersionsToDeleteInput { + version_file, + cutoff_time, + min_versions_to_keep, + }; + + let operator = ComputeVersionsToDeleteOperator {}; + let result = tokio_test::block_on(operator.run(&input)).unwrap(); + + // Verify no versions newer than cutoff_time are marked for deletion + let versions = &result.version_file.version_history.unwrap().versions; + for version in versions { + if version.created_at_secs >= cutoff_time.timestamp() { + prop_assert!(!version.marked_for_deletion, + "Version {} created at {} should not be marked for deletion as it's newer than cutoff {}", + version.version, + version.created_at_secs, + cutoff_time.timestamp() + ); + } + } + } + + #[test] + fn prop_version_zero_never_deleted( + min_versions_to_keep in 1u32..10u32, + additional_versions in 0u32..90u32, + cutoff_hours in 1i64..168i64 + ) { + let total_versions = min_versions_to_keep + additional_versions; + // Start from 0 for this test to include version 0 + let versions: Vec = (0..=total_versions as i64).collect(); + + let now = Utc::now(); + let version_infos = create_ordered_version_infos(versions, now); + let version_file = create_version_file(version_infos); + let input = ComputeVersionsToDeleteInput { + version_file, + cutoff_time: now - Duration::hours(cutoff_hours), + min_versions_to_keep, + }; + + let operator = ComputeVersionsToDeleteOperator {}; + let result = tokio_test::block_on(operator.run(&input)).unwrap(); + + // Verify version 0 is never marked for deletion + let versions = &result.version_file.version_history.unwrap().versions; + for version in versions { + if version.version == 0 { + prop_assert!(!version.marked_for_deletion, + "Version 0 should never be marked for deletion" + ); + } + } + } + + #[test] + fn prop_versions_are_chronologically_ordered( + min_versions_to_keep in 1u32..10u32, + additional_versions in 0u32..90u32, + cutoff_hours in 1i64..168i64 + ) { + let total_versions = min_versions_to_keep + additional_versions; + let versions: Vec = (1..=total_versions as i64).collect(); + + let now = Utc::now(); + let version_infos = create_ordered_version_infos(versions, now); + + // Verify that higher version numbers have later timestamps + let version_file = create_version_file(version_infos); + let versions = &version_file.version_history.as_ref().unwrap().versions; + + for window in versions.windows(2) { + if window[0].version < window[1].version { + prop_assert!( + window[0].created_at_secs <= window[1].created_at_secs, + "Version {} (timestamp {}) should have earlier or equal timestamp than version {} (timestamp {})", + window[0].version, + window[0].created_at_secs, + window[1].version, + window[1].created_at_secs + ); + } + } + + // Run the operator to ensure it works correctly with ordered versions + let input = ComputeVersionsToDeleteInput { + version_file, + cutoff_time: now - Duration::hours(cutoff_hours), + min_versions_to_keep, + }; + + let operator = ComputeVersionsToDeleteOperator {}; + let _ = tokio_test::block_on(operator.run(&input)).unwrap(); + } + } } diff --git a/rust/garbage_collector/tests/garbage_collector_prop_test.rs b/rust/garbage_collector/tests/garbage_collector_prop_test.rs index c685dac969f..fe37c322883 100644 --- a/rust/garbage_collector/tests/garbage_collector_prop_test.rs +++ b/rust/garbage_collector/tests/garbage_collector_prop_test.rs @@ -106,6 +106,12 @@ mod tests { "Version increased as expected" ); return Ok(()); + } else { + tracing::info!( + max_version, + expected_version, + "Version has not yet increased" + ); } } } @@ -113,6 +119,69 @@ mod tests { Err("Timeout waiting for version to increase".into()) } + async fn verify_version_ordering( + clients: &mut ChromaGrpcClients, + collection_id: &str, + tenant_id: &str, + ) -> Result> { + let versions = clients + .list_collection_versions(collection_id, tenant_id, Some(100), None, None) + .await?; + + // Check if versions are ordered (should be in ascending order) + let mut prev_version = 0; // Start from 0 for ascending order + for version in versions.versions.iter() { + if version.version < prev_version { + tracing::error!( + "Versions not in ascending order. Full version list: {:?}", + versions + .versions + .iter() + .map(|v| v.version) + .collect::>() + ); + return Ok(false); + } + prev_version = version.version; + } + + Ok(true) + } + + async fn verify_versions_after_cutoff( + clients: &mut ChromaGrpcClients, + collection_id: &str, + tenant_id: &str, + cutoff_time: i64, + min_versions: usize, + ) -> Result> { + let versions = clients + .list_collection_versions(collection_id, tenant_id, Some(100), None, None) + .await?; + + // If we have fewer versions than min_versions, all versions should be kept + if versions.versions.len() < min_versions { + return Ok(true); + } + + // Check that all remaining versions are either: + // 1. After the cutoff time, or + // 2. Among the minimum required versions + let mut sorted_versions: Vec<_> = versions.versions.iter().collect(); + sorted_versions.sort_by_key(|v| v.created_at_secs); + + // All other versions should be after cutoff + let remaining_versions: Vec<_> = sorted_versions.iter().skip(min_versions).collect(); + + for version in remaining_versions { + if version.created_at_secs < cutoff_time { + return Ok(false); + } + } + + Ok(true) + } + proptest! { #![proptest_config(ProptestConfig{ cases: 10, @@ -122,9 +191,9 @@ mod tests { })] #[test] fn test_k8s_integration_record_count_preserved_after_gc( - num_records in 22..100usize, + num_records in 40..100usize, num_gc_runs in 1..2usize, - num_insert_batches in 1..2usize, + num_insert_batches in 1..4usize, ) { // Initialize tracing subscriber for logging let _ = tracing_subscriber::fmt() @@ -284,4 +353,156 @@ mod tests { })?; } } + + proptest! { + #[test] + fn test_k8s_integration_gc_version_properties( + num_records in 50..100usize, + num_batches in 2..5usize, + cutoff_hours in 0..24i64, + ) { + // Initialize tracing subscriber for logging + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + // Setup test environment + let test_uuid = Uuid::new_v4(); + let tenant_id = format!("test_tenant_{}", test_uuid); + let database_name = format!("test_db_{}", test_uuid); + let collection_name = format!("test_collection_{}", test_uuid); + + // Initialize clients and storage + let mut clients = ChromaGrpcClients::new().await.unwrap(); + + // Create storage config and client + let storage_config = StorageConfig::ObjectStore(ObjectStoreConfig { + bucket: ObjectStoreBucketConfig { + name: "chroma-storage".to_string(), + r#type: ObjectStoreType::Minio, + }, + upload_part_size_bytes: 1024 * 1024, + download_part_size_bytes: 1024 * 1024, + max_concurrent_requests: 10, + }); + + let registry = Registry::new(); + let storage = Storage::try_from_config(&storage_config, ®istry).await.unwrap(); + + // Create collection + let collection_id = clients.create_database_and_collection( + &tenant_id, + &database_name, + &collection_name, + ).await.unwrap(); + + tracing::info!("Created collection with id: {}", collection_id); + // Create and add embeddings in multiple batches with time delays + let records_per_batch = num_records / num_batches; + let (all_embeddings, all_ids) = create_random_embeddings(num_records); + + let mut current_version = 0; + for i in 0..num_batches { + let start_idx = i * records_per_batch; + let end_idx = if i == num_batches - 1 { + num_records + } else { + (i + 1) * records_per_batch + }; + + let batch_embeddings = all_embeddings[start_idx..end_idx].to_vec(); + let batch_ids = all_ids[start_idx..end_idx].to_vec(); + + // Add embeddings and wait for version to increase + add_embeddings_and_wait_for_version( + &mut clients, + &collection_id, + &tenant_id, + batch_embeddings, + batch_ids, + current_version + 1, + 10, + ).await.unwrap(); + + current_version += 1; + + // Add artificial delay between batches + tokio::time::sleep(Duration::from_secs(1)).await; + } + + // Calculate cutoff time + let cutoff_time = (chrono::Utc::now() - chrono::Duration::hours(cutoff_hours)) + .timestamp(); + + // Run garbage collection + let sysdb_config = SysDbConfig::Grpc(GrpcSysDbConfig { + host: "localhost".to_string(), + port: 50051, + connect_timeout_ms: 5000, + request_timeout_ms: 10000, + num_channels: 1, + }); + + let mut sysdb = SysDb::try_from_config(&sysdb_config, ®istry).await.unwrap(); + + // Get collection info for GC + let collections_to_gc = sysdb.get_collections_to_gc().await.unwrap(); + let collection_info = collections_to_gc.iter() + .find(|c| c.id.0.to_string() == collection_id) + .expect("Collection should be available for GC"); + + // Run GC with the specified cutoff time + run_gc( + &collection_id, + &collection_info.version_file_path, + storage.clone(), + sysdb.clone(), + ).await.unwrap(); + + // Give GC time to complete + tokio::time::sleep(Duration::from_secs(2)).await; + + // Verify properties + + // 1. Check version ordering + prop_assert!( + verify_version_ordering(&mut clients, &collection_id, &tenant_id).await.unwrap(), + "Versions are not properly ordered" + ); + + // 2. Check versions relative to cutoff time + prop_assert!( + verify_versions_after_cutoff( + &mut clients, + &collection_id, + &tenant_id, + cutoff_time, + 2 // minimum versions to keep + ).await.unwrap(), + "Versions don't satisfy cutoff time requirements" + ); + + // 3. Verify data integrity + let final_records = clients.get_records( + &collection_id, + None, + true, + false, + false, + ).await.unwrap(); + + prop_assert_eq!( + final_records.ids.len(), + num_records, + "Record count changed after GC" + ); + + Ok(()) + })?; + } + } }