diff --git a/deepeval/synthesizer/context_generator.py b/deepeval/synthesizer/context_generator.py index a38fb79a1..934f72123 100644 --- a/deepeval/synthesizer/context_generator.py +++ b/deepeval/synthesizer/context_generator.py @@ -1,5 +1,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Tuple +import chromadb import random from deepeval.synthesizer.doc_chunker import ( @@ -10,6 +11,8 @@ from deepeval.models.openai_embedding_model import OpenAIEmbeddingModel from deepeval.models.base_model import DeepEvalBaseEmbeddingModel +DB_FOLDER = ".vectordb" +DB_COLLECTION_NAME = "synth_vectordb" class ContextGenerator: def __init__( @@ -27,6 +30,7 @@ def __init__( self.multithreading = multithreading self.document_paths: List[str] = document_paths + self.client = chromadb.PersistentClient(path=DB_FOLDER) # TODO: Potential bug, calling generate_goldens_from_docs # twice in a notebook enviornment will not refresh combined chunks self.combined_chunks: List[Chunk] = self._load_docs() @@ -50,17 +54,22 @@ def generate_contexts( ############### Load Docs ############################# def _load_docs(self) -> List[Chunk]: - - def process_document(path): - doc_chunker = DocumentChunker( - self.embedder, self.chunk_size, self.chunk_overlap - ) - return doc_chunker.load_doc(path) - + # check if docs are already in db + new_doc_paths = [] + old_doc_paths = [] + collection = self.client.get_or_create_collection(name="test") + for path in self.document_paths: + docs_from_path = collection.get(where={"source": path}) + if len(docs_from_path['documents']) == 0: + new_doc_paths.append(path) + else: + old_doc_paths.append(path) + # create chunks for docs not in db + if len(new_doc_paths) != 0: + print("Calculating embeddings for: " + str(new_doc_paths)) combined_chunks = [] - if not self.multithreading: - for path in self.document_paths: + for path in new_doc_paths: doc_chunker = DocumentChunker( self.embedder, self.chunk_size, self.chunk_overlap ) @@ -69,8 +78,8 @@ def process_document(path): else: with ThreadPoolExecutor() as executor: future_to_path = { - executor.submit(process_document, path): path - for path in self.document_paths + executor.submit(self.process_document, path): path + for path in new_doc_paths } for future in as_completed(future_to_path): path = future_to_path[future] @@ -79,9 +88,63 @@ def process_document(path): combined_chunks.extend(chunks) except Exception as exc: print(f"{path} generated an exception: {exc}") - + self.save_chunks_to_db(collection, combined_chunks) + # create chunks from docs in db + saved_chunks = self.get_chunks_from_db(collection, old_doc_paths) + combined_chunks.extend(saved_chunks) return combined_chunks + + def process_document(self, path): + doc_chunker = DocumentChunker( + self.embedder, self.chunk_size, self.chunk_overlap + ) + return doc_chunker.load_doc(path) + + + def save_chunks_to_db(self, collection, combined_chunks): + if len(combined_chunks) == 0: return + documents = [] + embeddings = [] + metadatas=[] + ids=[] + for i, chunk in enumerate(combined_chunks): + documents.append(chunk.content) + embeddings.append(chunk.embedding) + metadatas.append({"source": chunk.source_file, + "similarity_to_mean": chunk.similarity_to_mean}) + ids.append(chunk.source_file + str(i)) + + collection.add( + documents=documents, + embeddings=embeddings, + metadatas=metadatas, + ids=ids + ) + + def get_chunks_from_db(self, collection, paths): + chunks = [] + filter = {} + if len(paths) == 0: + return [] + elif len(paths) == 1: + filter["source"] = paths[0] + else: + filter["$or"] = [{"source": {"$eq": path}} for path in paths] + print("Getting cached embeddings for: " + str(paths)) + results = collection.get(where=filter, include=['embeddings', 'documents', 'metadatas']) + for i in range(len(results["ids"])): + chunk = Chunk( + id=results['ids'][i], + content=results['documents'][i], + embedding=results['embeddings'][i], + source_file=results['metadatas'][i]['source'], + similarity_to_mean=results['metadatas'][i]['similarity_to_mean'] + ) + chunks.append(chunk) + return chunks + + ############### Search N Chunks ######################## def _get_n_random_clusters( self, n: int, cluster_size: int @@ -130,3 +193,14 @@ def _get_n_other_similar_chunks( similar_chunks = [self.combined_chunks[i] for i in top_n_indices] return similar_chunks + +######################################## + +import time + +if __name__ == "__main__": + start_time = time.time() + generator = ContextGenerator(document_paths=['data/pdf_example.pdf', 'data/txt_example.txt', 'data/docx_example.docx', 'data/large.pdf']) + end_time = time.time() + + print(f"Initialization and loading took {end_time - start_time:.2f} seconds.") diff --git a/tests/test_synthesizer.py b/tests/test_synthesizer.py index 79844e917..6c9ca7dde 100644 --- a/tests/test_synthesizer.py +++ b/tests/test_synthesizer.py @@ -7,12 +7,18 @@ def test_synthesizer(): module_b_dir = os.path.dirname(os.path.realpath(__file__)) - file_path = os.path.join( + file_path_1 = os.path.join( module_b_dir, "synthesizer_data", "pdf_example.pdf" ) + file_path_2 = os.path.join( + module_b_dir, "synthesizer_data", "txt_example.txt" + ) synthesizer = Synthesizer() + synthesizer.generate_goldens_from_docs( - document_paths=[file_path], + document_paths=[file_path_1, file_path_2], max_goldens_per_document=2, ) synthesizer.save_as(file_type="json", directory="./results") + +test_synthesizer() \ No newline at end of file