Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid doc id duplication #103

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions src/collection_manager/sides/write/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ impl CollectionWriter {
sender: OperationSender,
hooks_runtime: Arc<HooksRuntime>,
) -> Result<()> {
// Those `?` is never triggered, but it's here to make the compiler happy:
// The "id" property is always present in the document.
// TODO: do this better
let doc_id_str = doc
.inner
.get("id")
.context("Document does not have an id")?
.as_str()
.context("Document id is not a string")?;
let mut doc_id_storage = self.doc_id_storage.write().await;
if !doc_id_storage.insert_document_id(doc_id_str.to_string(), doc_id) {
// The document is already indexed.
// If the document id is there, it will be difficul to remove it.
// So, we decided to just ignore it.
// We could at least return a warning to the user.
// TODO: return a warning
warn!("Document '{}' already indexed", doc_id_str);
return Ok(());
}
drop(doc_id_storage);

// We send the document to index *before* indexing it, so we can
// guarantee that the document is there during the search.
// Otherwise, we could find the document without having it yet.
Expand All @@ -138,18 +159,6 @@ impl CollectionWriter {
.await
.map_err(|e| anyhow!("Error sending document to index writer: {:?}", e))?;

// Those `?` is never triggered, but it's here to make the compiler happy
// TODO: do this better
let doc_id_str = doc
.inner
.get("id")
.context("Document does not have an id")?
.as_str()
.context("Document id is not a string")?;
let mut doc_id_storage = self.doc_id_storage.write().await;
doc_id_storage.insert_document_id(doc_id_str.to_string(), doc_id);
drop(doc_id_storage);

let fields_to_index = self
.get_fields_to_index(doc.clone(), sender.clone(), hooks_runtime)
.await
Expand Down
10 changes: 8 additions & 2 deletions src/collection_manager/sides/write/collection/doc_id_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ impl DocIdStorage {
.collect()
}

pub fn insert_document_id(&mut self, doc_id: String, document_id: DocumentId) {
self.document_id.insert(doc_id, document_id);
#[must_use]
pub fn insert_document_id(&mut self, doc_id: String, document_id: DocumentId) -> bool {
if let std::collections::hash_map::Entry::Vacant(e) = self.document_id.entry(doc_id) {
e.insert(document_id);
true
} else {
false
}
}

pub fn commit(&self, data_dir: PathBuf) -> Result<()> {
Expand Down
72 changes: 68 additions & 4 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,10 +1611,7 @@ async fn test_delete_documents() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_array_types() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut config = create_oramacore_config();
config.reader_side.config.insert_batch_commit_size = 1_000_000;
config.writer_side.config.insert_batch_commit_size = 1_000_000;

let config = create_oramacore_config();
let (write_side, read_side) = create(config.clone()).await?;

let collection_id = CollectionId("test-collection".to_string());
Expand Down Expand Up @@ -1694,6 +1691,73 @@ async fn test_array_types() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn test_document_duplication() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let config = create_oramacore_config();
let (write_side, read_side) = create(config.clone()).await?;

let collection_id = CollectionId("test-collection".to_string());
write_side
.create_collection(
ApiKey(Secret::new("my-master-api-key".to_string())),
json!({
"id": collection_id.0.clone(),
"read_api_key": "my-read-api-key",
"write_api_key": "my-write-api-key",
})
.try_into()?,
)
.await?;

insert_docs(
write_side.clone(),
ApiKey(Secret::new("my-write-api-key".to_string())),
collection_id.clone(),
vec![json!({
"id": "1",
"text": "B",
})],
)
.await?;
insert_docs(
write_side.clone(),
ApiKey(Secret::new("my-write-api-key".to_string())),
collection_id.clone(),
vec![json!({
"id": "1",
"text": "C",
})],
)
.await?;

let result = read_side
.search(
ApiKey(Secret::new("my-read-api-key".to_string())),
collection_id.clone(),
json!({
"term": "B",
})
.try_into()?,
)
.await?;
assert_eq!(result.count, 1);

let result = read_side
.search(
ApiKey(Secret::new("my-read-api-key".to_string())),
collection_id.clone(),
json!({
"term": "C",
})
.try_into()?,
)
.await?;
assert_eq!(result.count, 0);

Ok(())
}

async fn create_collection(write_side: Arc<WriteSide>, collection_id: CollectionId) -> Result<()> {
write_side
.create_collection(
Expand Down