From f2d8f5ccc9ca99762f41ff56aec35a516454c5cf Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Mon, 3 Feb 2025 16:13:01 +0100 Subject: [PATCH] Avoid doc id duplication --- .../sides/write/collection.rs | 33 +++++---- .../sides/write/collection/doc_id_storage.rs | 10 ++- src/tests.rs | 72 +++++++++++++++++-- 3 files changed, 97 insertions(+), 18 deletions(-) diff --git a/src/collection_manager/sides/write/collection.rs b/src/collection_manager/sides/write/collection.rs index 6c73d91..e8c3c0d 100644 --- a/src/collection_manager/sides/write/collection.rs +++ b/src/collection_manager/sides/write/collection.rs @@ -124,6 +124,27 @@ impl CollectionWriter { sender: OperationSender, hooks_runtime: Arc, ) -> Result<()> { + // Those `?` is never triggered, but it's here to make the compiler happy: + // The "id" property is always present in the document. + // TODO: do this better + let doc_id_str = doc + .inner + .get("id") + .context("Document does not have an id")? + .as_str() + .context("Document id is not a string")?; + let mut doc_id_storage = self.doc_id_storage.write().await; + if !doc_id_storage.insert_document_id(doc_id_str.to_string(), doc_id) { + // The document is already indexed. + // If the document id is there, it will be difficul to remove it. + // So, we decided to just ignore it. + // We could at least return a warning to the user. + // TODO: return a warning + warn!("Document '{}' already indexed", doc_id_str); + return Ok(()); + } + drop(doc_id_storage); + // We send the document to index *before* indexing it, so we can // guarantee that the document is there during the search. // Otherwise, we could find the document without having it yet. @@ -138,18 +159,6 @@ impl CollectionWriter { .await .map_err(|e| anyhow!("Error sending document to index writer: {:?}", e))?; - // Those `?` is never triggered, but it's here to make the compiler happy - // TODO: do this better - let doc_id_str = doc - .inner - .get("id") - .context("Document does not have an id")? - .as_str() - .context("Document id is not a string")?; - let mut doc_id_storage = self.doc_id_storage.write().await; - doc_id_storage.insert_document_id(doc_id_str.to_string(), doc_id); - drop(doc_id_storage); - let fields_to_index = self .get_fields_to_index(doc.clone(), sender.clone(), hooks_runtime) .await diff --git a/src/collection_manager/sides/write/collection/doc_id_storage.rs b/src/collection_manager/sides/write/collection/doc_id_storage.rs index 9d33c1e..b2cc30d 100644 --- a/src/collection_manager/sides/write/collection/doc_id_storage.rs +++ b/src/collection_manager/sides/write/collection/doc_id_storage.rs @@ -22,8 +22,14 @@ impl DocIdStorage { .collect() } - pub fn insert_document_id(&mut self, doc_id: String, document_id: DocumentId) { - self.document_id.insert(doc_id, document_id); + #[must_use] + pub fn insert_document_id(&mut self, doc_id: String, document_id: DocumentId) -> bool { + if let std::collections::hash_map::Entry::Vacant(e) = self.document_id.entry(doc_id) { + e.insert(document_id); + true + } else { + false + } } pub fn commit(&self, data_dir: PathBuf) -> Result<()> { diff --git a/src/tests.rs b/src/tests.rs index 7bc7ee8..b5d6684 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1611,10 +1611,7 @@ async fn test_delete_documents() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 10)] async fn test_array_types() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); - let mut config = create_oramacore_config(); - config.reader_side.config.insert_batch_commit_size = 1_000_000; - config.writer_side.config.insert_batch_commit_size = 1_000_000; - + let config = create_oramacore_config(); let (write_side, read_side) = create(config.clone()).await?; let collection_id = CollectionId("test-collection".to_string()); @@ -1694,6 +1691,73 @@ async fn test_array_types() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn test_document_duplication() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let config = create_oramacore_config(); + let (write_side, read_side) = create(config.clone()).await?; + + let collection_id = CollectionId("test-collection".to_string()); + write_side + .create_collection( + ApiKey(Secret::new("my-master-api-key".to_string())), + json!({ + "id": collection_id.0.clone(), + "read_api_key": "my-read-api-key", + "write_api_key": "my-write-api-key", + }) + .try_into()?, + ) + .await?; + + insert_docs( + write_side.clone(), + ApiKey(Secret::new("my-write-api-key".to_string())), + collection_id.clone(), + vec![json!({ + "id": "1", + "text": "B", + })], + ) + .await?; + insert_docs( + write_side.clone(), + ApiKey(Secret::new("my-write-api-key".to_string())), + collection_id.clone(), + vec![json!({ + "id": "1", + "text": "C", + })], + ) + .await?; + + let result = read_side + .search( + ApiKey(Secret::new("my-read-api-key".to_string())), + collection_id.clone(), + json!({ + "term": "B", + }) + .try_into()?, + ) + .await?; + assert_eq!(result.count, 1); + + let result = read_side + .search( + ApiKey(Secret::new("my-read-api-key".to_string())), + collection_id.clone(), + json!({ + "term": "C", + }) + .try_into()?, + ) + .await?; + assert_eq!(result.count, 0); + + Ok(()) +} + async fn create_collection(write_side: Arc, collection_id: CollectionId) -> Result<()> { write_side .create_collection(