Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChromaDB Integration #663

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 86 additions & 12 deletions deepeval/synthesizer/context_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple
import chromadb
import random

from deepeval.synthesizer.doc_chunker import (
Expand All @@ -10,6 +11,8 @@
from deepeval.models.openai_embedding_model import OpenAIEmbeddingModel
from deepeval.models.base_model import DeepEvalBaseEmbeddingModel

DB_FOLDER = ".vectordb"
DB_COLLECTION_NAME = "synth_vectordb"

class ContextGenerator:
def __init__(
Expand All @@ -27,6 +30,7 @@ def __init__(
self.multithreading = multithreading
self.document_paths: List[str] = document_paths

self.client = chromadb.PersistentClient(path=DB_FOLDER)
# TODO: Potential bug, calling generate_goldens_from_docs
# twice in a notebook enviornment will not refresh combined chunks
self.combined_chunks: List[Chunk] = self._load_docs()
Expand All @@ -50,17 +54,22 @@ def generate_contexts(

############### Load Docs #############################
def _load_docs(self) -> List[Chunk]:

def process_document(path):
doc_chunker = DocumentChunker(
self.embedder, self.chunk_size, self.chunk_overlap
)
return doc_chunker.load_doc(path)

# check if docs are already in db
new_doc_paths = []
old_doc_paths = []
collection = self.client.get_or_create_collection(name="test")
for path in self.document_paths:
docs_from_path = collection.get(where={"source": path})
if len(docs_from_path['documents']) == 0:
new_doc_paths.append(path)
else:
old_doc_paths.append(path)
# create chunks for docs not in db
if len(new_doc_paths) != 0:
print("Calculating embeddings for: " + str(new_doc_paths))
combined_chunks = []

if not self.multithreading:
for path in self.document_paths:
for path in new_doc_paths:
doc_chunker = DocumentChunker(
self.embedder, self.chunk_size, self.chunk_overlap
)
Expand All @@ -69,8 +78,8 @@ def process_document(path):
else:
with ThreadPoolExecutor() as executor:
future_to_path = {
executor.submit(process_document, path): path
for path in self.document_paths
executor.submit(self.process_document, path): path
for path in new_doc_paths
}
for future in as_completed(future_to_path):
path = future_to_path[future]
Expand All @@ -79,9 +88,63 @@ def process_document(path):
combined_chunks.extend(chunks)
except Exception as exc:
print(f"{path} generated an exception: {exc}")

self.save_chunks_to_db(collection, combined_chunks)
# create chunks from docs in db
saved_chunks = self.get_chunks_from_db(collection, old_doc_paths)
combined_chunks.extend(saved_chunks)
return combined_chunks


def process_document(self, path):
doc_chunker = DocumentChunker(
self.embedder, self.chunk_size, self.chunk_overlap
)
return doc_chunker.load_doc(path)


def save_chunks_to_db(self, collection, combined_chunks):
if len(combined_chunks) == 0: return
documents = []
embeddings = []
metadatas=[]
ids=[]
for i, chunk in enumerate(combined_chunks):
documents.append(chunk.content)
embeddings.append(chunk.embedding)
metadatas.append({"source": chunk.source_file,
"similarity_to_mean": chunk.similarity_to_mean})
ids.append(chunk.source_file + str(i))

collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)

def get_chunks_from_db(self, collection, paths):
chunks = []
filter = {}
if len(paths) == 0:
return []
elif len(paths) == 1:
filter["source"] = paths[0]
else:
filter["$or"] = [{"source": {"$eq": path}} for path in paths]
print("Getting cached embeddings for: " + str(paths))
results = collection.get(where=filter, include=['embeddings', 'documents', 'metadatas'])
for i in range(len(results["ids"])):
chunk = Chunk(
id=results['ids'][i],
content=results['documents'][i],
embedding=results['embeddings'][i],
source_file=results['metadatas'][i]['source'],
similarity_to_mean=results['metadatas'][i]['similarity_to_mean']
)
chunks.append(chunk)
return chunks


############### Search N Chunks ########################
def _get_n_random_clusters(
self, n: int, cluster_size: int
Expand Down Expand Up @@ -130,3 +193,14 @@ def _get_n_other_similar_chunks(
similar_chunks = [self.combined_chunks[i] for i in top_n_indices]

return similar_chunks

########################################

import time

if __name__ == "__main__":
start_time = time.time()
generator = ContextGenerator(document_paths=['data/pdf_example.pdf', 'data/txt_example.txt', 'data/docx_example.docx', 'data/large.pdf'])
end_time = time.time()

print(f"Initialization and loading took {end_time - start_time:.2f} seconds.")
10 changes: 8 additions & 2 deletions tests/test_synthesizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@
def test_synthesizer():
module_b_dir = os.path.dirname(os.path.realpath(__file__))

file_path = os.path.join(
file_path_1 = os.path.join(
module_b_dir, "synthesizer_data", "pdf_example.pdf"
)
file_path_2 = os.path.join(
module_b_dir, "synthesizer_data", "txt_example.txt"
)
synthesizer = Synthesizer()

synthesizer.generate_goldens_from_docs(
document_paths=[file_path],
document_paths=[file_path_1, file_path_2],
max_goldens_per_document=2,
)
synthesizer.save_as(file_type="json", directory="./results")

test_synthesizer()
Loading