diff --git a/rust/garbage_collector/src/garbage_collector_orchestrator.rs b/rust/garbage_collector/src/garbage_collector_orchestrator.rs index ff29503a50e..5cbf0e826dc 100644 --- a/rust/garbage_collector/src/garbage_collector_orchestrator.rs +++ b/rust/garbage_collector/src/garbage_collector_orchestrator.rs @@ -39,6 +39,7 @@ use std::fmt::{Debug, Formatter}; +use crate::types::CleanupMode; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; use chroma_storage::Storage; @@ -448,7 +449,12 @@ impl Handler) -> std::fmt::Result { f.debug_struct("DeleteUnusedFilesOperator") - .field("soft_delete", &self.soft_delete) + .field("cleanup_mode", &self.cleanup_mode) + .field("collection_id", &self.collection_id) .finish_non_exhaustive() } } impl DeleteUnusedFilesOperator { - pub fn new(storage: Storage, soft_delete: bool) -> Self { - tracing::debug!(soft_delete, "Creating new DeleteUnusedFilesOperator"); + pub fn new(storage: Storage, cleanup_mode: CleanupMode, collection_id: String) -> Self { + tracing::debug!( + cleanup_mode = ?cleanup_mode, + collection_id = %collection_id, + "Creating new DeleteUnusedFilesOperator" + ); Self { storage, - soft_delete, + cleanup_mode, + collection_id, } } - fn get_soft_delete_path(&self, path: &str, epoch: i64) -> String { - format!("deleted/{epoch}/{path}") + // TODO(rohit): Remove epoch, or may be use timestamp instead. + fn get_rename_path(&self, path: &str, epoch: i64) -> String { + format!("gc/deleted/{epoch}/{path}") + } + + fn get_deletion_list_path(&self, timestamp: i64) -> String { + format!("gc/deletion-list/{}/{}.txt", self.collection_id, timestamp) + } + + async fn write_deletion_list( + &self, + files: &[String], + timestamp: i64, + failed_files: &[String], + ) -> Result<(), DeleteUnusedFilesError> { + let all_files: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + + let content = all_files.join("\n"); + + let mut final_content = content; + if !failed_files.is_empty() { + let mut sorted_failed = failed_files.to_vec(); + sorted_failed.sort(); + final_content.push_str("\n\nFailed files:\n"); + final_content.push_str(&sorted_failed.join("\n")); + } + + let path = self.get_deletion_list_path(timestamp); + + tracing::info!( + path = %path, + file_count = all_files.len(), + failed_count = failed_files.len(), + "Writing deletion list to S3" + ); + + self.storage + .put_bytes(&path, final_content.into_bytes(), Default::default()) + .await + .map_err(|e| DeleteUnusedFilesError::WriteListError { + path: path.clone(), + message: e.to_string(), + })?; + + Ok(()) } } @@ -51,6 +102,8 @@ pub enum DeleteUnusedFilesError { DeleteError { path: String, message: String }, #[error("Error renaming file {path}: {message}")] RenameError { path: String, message: String }, + #[error("Error writing deletion list {path}: {message}")] + WriteListError { path: String, message: String }, } impl ChromaError for DeleteUnusedFilesError { @@ -71,29 +124,16 @@ impl Operator for DeleteUnusedF &self, input: &DeleteUnusedFilesInput, ) -> Result { - tracing::info!( + tracing::debug!( files_count = input.unused_s3_files.len(), hnsw_prefixes_count = input.hnsw_prefixes_for_deletion.len(), files = ?input.unused_s3_files, hnsw_prefixes = ?input.hnsw_prefixes_for_deletion, - soft_delete = self.soft_delete, + cleanup_mode = ?self.cleanup_mode, "Starting deletion of unused files" ); - let mut deleted_files = HashSet::new(); - - // Log and delete regular unused files - println!("Deleting unused block files: {:?}", input.unused_s3_files); - for file_path in &input.unused_s3_files { - if !self - .delete_file(file_path, input.epoch_id, &mut deleted_files) - .await? - { - continue; - } - } - - // Log and delete HNSW files for each prefix + // Generate list of HNSW files let hnsw_files: Vec = input .hnsw_prefixes_for_deletion .iter() @@ -109,34 +149,36 @@ impl Operator for DeleteUnusedF .collect::>() }) .collect(); - println!("Deleting HNSW files: {:?}", hnsw_files); - - for prefix in &input.hnsw_prefixes_for_deletion { - for file in [ - "header.bin", - "data_level0.bin", - "length.bin", - "link_lists.bin", - ] - .iter() - { - let file_path = format!("hnsw/{}/{}", prefix, file); - if !self - .delete_file(&file_path, input.epoch_id, &mut deleted_files) - .await? - { - continue; - } + + // Create a list that contains all files that will be deleted. + let mut all_files = input.unused_s3_files.iter().cloned().collect::>(); + all_files.extend(hnsw_files); + + // If we're in list-only mode, write the list and return + if matches!(self.cleanup_mode, CleanupMode::ListOnly) { + self.write_deletion_list(&all_files, input.epoch_id, &[]) + .await?; + return Ok(DeleteUnusedFilesOutput { + deleted_files: all_files.into_iter().collect(), + }); + } + + let mut failed_files = Vec::new(); + for file_path in &all_files { + if !self.delete_file(file_path, input.epoch_id).await? { + failed_files.push(file_path.clone()); } } - tracing::info!( - deleted_count = deleted_files.len(), - deleted_files = ?deleted_files, - "File deletion operation completed successfully" - ); + // Write the deletion list with failed files + self.write_deletion_list(&all_files, input.epoch_id, &failed_files) + .await?; + + tracing::debug!("File deletion operation completed"); - Ok(DeleteUnusedFilesOutput { deleted_files }) + Ok(DeleteUnusedFilesOutput { + deleted_files: all_files.into_iter().collect(), + }) } } @@ -145,59 +187,63 @@ impl DeleteUnusedFilesOperator { &self, file_path: &str, epoch_id: i64, - deleted_files: &mut HashSet, ) -> Result { - if self.soft_delete { - // Soft delete - rename the file - let new_path = self.get_soft_delete_path(file_path, epoch_id); - tracing::info!( - old_path = %file_path, - new_path = %new_path, - "Renaming file for soft delete" - ); - - match self.storage.rename(file_path, &new_path).await { - Ok(_) => { - tracing::info!( - old_path = %file_path, - new_path = %new_path, - "Successfully renamed file" - ); - deleted_files.insert(file_path.to_string()); - Ok(true) - } - Err(e) => { - tracing::error!( - error = %e, - path = %file_path, - "Failed to rename file" - ); - Err(DeleteUnusedFilesError::RenameError { - path: file_path.to_string(), - message: e.to_string(), - }) - } + match self.cleanup_mode { + CleanupMode::ListOnly => { + tracing::info!(path = %file_path, "Would process file (list only mode)"); + Ok(true) } - } else { - // Hard delete - remove the file - tracing::info!(path = %file_path, "Deleting file"); - - match self.storage.delete(file_path).await { - Ok(_) => { - tracing::info!(path = %file_path, "Successfully deleted file"); - deleted_files.insert(file_path.to_string()); - Ok(true) + CleanupMode::Rename => { + // Soft delete - rename the file + let new_path = self.get_rename_path(file_path, epoch_id); + tracing::info!( + old_path = %file_path, + new_path = %new_path, + "Renaming file for soft delete" + ); + + match self.storage.rename(file_path, &new_path).await { + Ok(_) => { + tracing::info!( + old_path = %file_path, + new_path = %new_path, + "Successfully renamed file" + ); + Ok(true) + } + Err(e) => { + tracing::error!( + error = %e, + path = %file_path, + "Failed to rename file" + ); + Err(DeleteUnusedFilesError::RenameError { + path: file_path.to_string(), + message: e.to_string(), + }) + } } - Err(e) => { - tracing::error!( - error = %e, - path = %file_path, - "Failed to delete file" - ); - Err(DeleteUnusedFilesError::DeleteError { - path: file_path.to_string(), - message: e.to_string(), - }) + } + CleanupMode::Delete => { + // Hard delete - remove the file + tracing::info!(path = %file_path, "Deleting file"); + + match self.storage.delete(file_path).await { + Ok(_) => { + tracing::info!(path = %file_path, "Successfully deleted file"); + Ok(true) + } + Err(e) => { + tracing::error!( + error = %e, + path = %file_path, + "Failed to delete file" + ); + Err(DeleteUnusedFilesError::DeleteError { + path: file_path.to_string(), + message: e.to_string(), + }) + } } } } @@ -219,121 +265,246 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn test_hard_delete_success() { - let tmp_dir = TempDir::new().unwrap(); - let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - - // Create test files + async fn setup_test_files(storage: &Storage) -> (Vec, Vec) { + // Create regular test files let test_files = vec!["file1.txt", "file2.txt"]; for file in &test_files { - create_test_file(&storage, file, b"test content").await; + create_test_file(storage, file, b"test content").await; } - // Create HNSW test files with correct filenames - let hnsw_files = [ + // Create HNSW test files + let hnsw_files = vec![ "hnsw/prefix1/header.bin", "hnsw/prefix1/data_level0.bin", "hnsw/prefix1/length.bin", "hnsw/prefix1/link_lists.bin", ]; for file in &hnsw_files { - create_test_file(&storage, file, b"test content").await; + create_test_file(storage, file, b"test content").await; } + ( + test_files.iter().map(|s| s.to_string()).collect(), + hnsw_files.iter().map(|s| s.to_string()).collect(), + ) + } + + #[tokio::test] + async fn test_list_only_mode() { + let tmp_dir = TempDir::new().unwrap(); + let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); + let (test_files, hnsw_files) = setup_test_files(&storage).await; + let mut unused_files = HashSet::new(); - unused_files.extend(test_files.iter().map(|s| s.to_string())); + unused_files.extend(test_files.clone()); - let operator = DeleteUnusedFilesOperator::new(storage.clone(), false); + let operator = DeleteUnusedFilesOperator::new( + storage.clone(), + CleanupMode::ListOnly, + "test_collection".to_string(), + ); let input = DeleteUnusedFilesInput { unused_s3_files: unused_files.clone(), epoch_id: 123, hnsw_prefixes_for_deletion: vec!["prefix1".to_string()], }; - let _result = operator.run(&input).await.unwrap(); + let result = operator.run(&input).await.unwrap(); - // Verify regular files were deleted - for file in test_files { - assert!(!Path::new(&tmp_dir.path().join(file)).exists()); + // Verify deletion list file was created + let deletion_list_path = tmp_dir + .path() + .join("gc/deletion-list/test_collection/123.txt"); + assert!(deletion_list_path.exists()); + + // Verify original files still exist + for file in &test_files { + assert!(result.deleted_files.contains(file)); + assert!(Path::new(&tmp_dir.path().join(file)).exists()); } - // Verify HNSW files were deleted - for file in hnsw_files { - assert!(!Path::new(&tmp_dir.path().join(file)).exists()); + // Read and verify deletion list content + let content = std::fs::read_to_string(deletion_list_path).unwrap(); + let listed_files: HashSet<_> = content.lines().collect(); + for file in &test_files { + assert!(listed_files.contains(file.as_str())); + } + for file in &hnsw_files { + assert!(listed_files.contains(file.as_str())); } } #[tokio::test] - async fn test_soft_delete_success() { + async fn test_rename_mode() { let tmp_dir = TempDir::new().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); - - // Create test files - let test_files = vec!["file1.txt", "file2.txt"]; - for file in &test_files { - create_test_file(&storage, file, b"test content").await; - } - - // Create HNSW test files with correct filenames - let hnsw_files = [ - "hnsw/prefix1/header.bin", - "hnsw/prefix1/data_level0.bin", - "hnsw/prefix1/length.bin", - "hnsw/prefix1/link_lists.bin", - ]; - for file in &hnsw_files { - create_test_file(&storage, file, b"test content").await; - } + let (test_files, hnsw_files) = setup_test_files(&storage).await; let mut unused_files = HashSet::new(); - unused_files.extend(test_files.iter().map(|s| s.to_string())); + unused_files.extend(test_files.clone()); - let operator = DeleteUnusedFilesOperator::new(storage.clone(), true); + let operator = DeleteUnusedFilesOperator::new( + storage.clone(), + CleanupMode::Rename, + "test_collection".to_string(), + ); let input = DeleteUnusedFilesInput { unused_s3_files: unused_files.clone(), epoch_id: 123, hnsw_prefixes_for_deletion: vec!["prefix1".to_string()], }; - let _result = operator.run(&input).await.unwrap(); + let result = operator.run(&input).await.unwrap(); + + // Verify deletion list was created + let deletion_list_path = tmp_dir + .path() + .join("gc/deletion-list/test_collection/123.txt"); + assert!(deletion_list_path.exists()); // Verify regular files were moved to deleted directory - for file in test_files { + for file in &test_files { let original_path = tmp_dir.path().join(file); - let new_path = tmp_dir.path().join(format!("deleted/123/{}", file)); + let new_path = tmp_dir.path().join(format!("gc/deleted/123/{}", file)); assert!(!original_path.exists()); assert!(new_path.exists()); + assert!(result.deleted_files.contains(file)); } // Verify HNSW files were moved to deleted directory - for file in hnsw_files { + for file in &hnsw_files { let original_path = tmp_dir.path().join(file); - let new_path = tmp_dir.path().join(format!("deleted/123/{}", file)); + let new_path = tmp_dir.path().join(format!("gc/deleted/123/{}", file)); assert!(!original_path.exists()); assert!(new_path.exists()); + assert!(result.deleted_files.contains(file)); + } + + // Verify deletion list contents + let content = std::fs::read_to_string(deletion_list_path).unwrap(); + let listed_files: HashSet<_> = content.lines().collect(); + for file in &test_files { + assert!(listed_files.contains(file.as_str())); + } + for file in &hnsw_files { + assert!(listed_files.contains(file.as_str())); } } #[tokio::test] - async fn test_delete_nonexistent_file() { + async fn test_delete_mode() { let tmp_dir = TempDir::new().unwrap(); let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); + let (test_files, hnsw_files) = setup_test_files(&storage).await; let mut unused_files = HashSet::new(); - unused_files.insert("nonexistent.txt".to_string()); + unused_files.extend(test_files.clone()); - let operator = DeleteUnusedFilesOperator::new(storage, false); + let operator = DeleteUnusedFilesOperator::new( + storage.clone(), + CleanupMode::Delete, + "test_collection".to_string(), + ); let input = DeleteUnusedFilesInput { - unused_s3_files: unused_files, + unused_s3_files: unused_files.clone(), epoch_id: 123, - hnsw_prefixes_for_deletion: vec![], + hnsw_prefixes_for_deletion: vec!["prefix1".to_string()], }; - let result = operator.run(&input).await; + let result = operator.run(&input).await.unwrap(); + + // Verify deletion list was created + let deletion_list_path = tmp_dir + .path() + .join("gc/deletion-list/test_collection/123.txt"); + assert!(deletion_list_path.exists()); + + // Verify regular files were deleted + for file in &test_files { + assert!(!Path::new(&tmp_dir.path().join(file)).exists()); + assert!(result.deleted_files.contains(file)); + } + + // Verify HNSW files were deleted + for file in &hnsw_files { + assert!(!Path::new(&tmp_dir.path().join(file)).exists()); + assert!(result.deleted_files.contains(file)); + } + + // Verify deletion list contents + let content = std::fs::read_to_string(deletion_list_path).unwrap(); + let listed_files: HashSet<_> = content.lines().collect(); + for file in &test_files { + assert!(listed_files.contains(file.as_str())); + } + for file in &hnsw_files { + assert!(listed_files.contains(file.as_str())); + } + } + + #[tokio::test] + async fn test_error_handling() { + let tmp_dir = TempDir::new().unwrap(); + let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); + + let mut unused_files = HashSet::new(); + unused_files.insert("nonexistent.txt".to_string()); + + // Test error handling for Delete mode + let delete_operator = DeleteUnusedFilesOperator::new( + storage.clone(), + CleanupMode::Delete, + "test_collection".to_string(), + ); + let result = delete_operator + .run(&DeleteUnusedFilesInput { + unused_s3_files: unused_files.clone(), + epoch_id: 123, + hnsw_prefixes_for_deletion: vec![], + }) + .await; assert!(matches!( result, Err(DeleteUnusedFilesError::DeleteError { .. }) )); + + // Test error handling for Rename mode + let rename_operator = DeleteUnusedFilesOperator::new( + storage.clone(), + CleanupMode::Rename, + "test_collection".to_string(), + ); + let result = rename_operator + .run(&DeleteUnusedFilesInput { + unused_s3_files: unused_files.clone(), + epoch_id: 123, + hnsw_prefixes_for_deletion: vec![], + }) + .await; + assert!(matches!( + result, + Err(DeleteUnusedFilesError::RenameError { .. }) + )); + + // Test ListOnly mode with nonexistent files (should succeed) + let list_operator = DeleteUnusedFilesOperator::new( + storage, + CleanupMode::ListOnly, + "test_collection".to_string(), + ); + let result = list_operator + .run(&DeleteUnusedFilesInput { + unused_s3_files: unused_files, + epoch_id: 123, + hnsw_prefixes_for_deletion: vec![], + }) + .await; + assert!(result.is_ok()); + + // Verify deletion list was created even for nonexistent files + let deletion_list_path = tmp_dir + .path() + .join("gc/deletion-list/test_collection/123.txt"); + assert!(deletion_list_path.exists()); } } diff --git a/rust/garbage_collector/src/types.rs b/rust/garbage_collector/src/types.rs new file mode 100644 index 00000000000..5eabdab5689 --- /dev/null +++ b/rust/garbage_collector/src/types.rs @@ -0,0 +1,9 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CleanupMode { + /// Only list files that would be affected without making changes + ListOnly, + /// Move files to a deletion directory instead of removing them + Rename, + /// Permanently delete files + Delete, +}