Skip to content

Commit

Permalink
V0
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwinzyx committed Oct 23, 2024
1 parent f11b269 commit 2820e30
Show file tree
Hide file tree
Showing 8 changed files with 844 additions and 0 deletions.
386 changes: 386 additions & 0 deletions ragbuilder_sdk_retriever.ipynb

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions src/ragbuilder/rag_templates/sota/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from langchain_community.llms import Ollama
from langchain_community.document_loaders import *
from langchain_community.embeddings import OllamaEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from operator import itemgetter
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
from langchain.retrievers import MergerRetriever
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_ollama import ChatOllama
from langchain_groq import ChatGroq
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI,GoogleGenerativeAIEmbeddings
from langchain_google_vertexai import ChatVertexAI, VertexAIEmbeddings
from langchain_community.llms import Ollama
from langchain_community.embeddings import OllamaEmbeddings
def rag_pipeline():
try:
def format_docs(docs):
return "\\n".join(doc.page_content for doc in docs)

splitter = RecursiveCharacterTextSplitter(chunk_size=1600, chunk_overlap=200)
splits=splitter.split_documents(docs)
c=Mivus.from_documents(documents=splits, embedding=embedding, collection_name='testindex-ragbuilder',)
retrievers=[]
retriever=c.as_retriever(search_type='similarity', search_kwargs={'k': 5})
retrievers.append(retriever)
retriever=BM25Retriever.from_documents(docs)
retrievers.append(retriever)
retriever=MergerRetriever(retrievers=retrievers)
prompt = hub.pull("rlm/rag-prompt")
rag_chain = (
RunnableParallel(context=retriever, question=RunnablePassthrough())
.assign(context=itemgetter("context") | RunnableLambda(format_docs))
.assign(answer=prompt | llm | StrOutputParser())
.pick(["answer", "context"]))
return rag_chain
except Exception as e:
print(f"An error occurred: {e}")
Empty file.
150 changes: 150 additions & 0 deletions src/ragbuilder/retriever/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from pydantic import BaseModel, Field, ValidationError
from typing import List, Optional, Union, Dict, Any
import yaml
import time
import random
from enum import Enum

class RetrieverType(str, Enum):
VECTOR_SEARCH = "vector"
BM25_RETRIEVER = "bm25"
# PARENT_DOC_RETRIEVER_FULL = "parent_doc_full"
# PARENT_DOC_RETRIEVER_LARGE = "parent_doc_large"
# COLBERT_RETRIEVER = "colbert"
CUSTOM = "custom"

class ReRanker(str, Enum):
flashrank = "FlaskRank"
rankGPT = "rankGPT"
CUSTOM = "custom"

# class EmbeddingModel(str, Enum):
# OPENAI = "openai"
# HUGGINGFACE = "huggingface"
# CUSTOM = "custom"

# class VectorDatabase(str, Enum):
# FAISS = "faiss"
# CHROMA = "chroma"
# CUSTOM = "custom"

class LoaderConfig(BaseModel):
type: RetrieverType
loader_kwargs: Optional[Dict[str, Any]] = None
custom_class: Optional[str] = None

# class ChunkSizeConfig(BaseModel):
# min: int = Field(default=100, description="Minimum chunk size")
# max: int = Field(default=500, description="Maximum chunk size")
# stepsize: int = Field(default=100, description="Step size for chunk size")

# class VectorDBConfig(BaseModel):
# type: VectorDatabase
# collection_name: Optional[str] = "__DEFAULT_COLLECTION__"
# persist_directory: Optional[str] = None
# client_settings: Optional[Dict[str, Any]] = None
# metadata: Optional[Dict[str, Any]] = None
# custom_class: Optional[str] = None

# class EmbeddingConfig(BaseModel):
# type: EmbeddingModel
# model: Optional[str] = None
# model_kwargs: Optional[Dict[str, Any]] = None
# custom_class: Optional[str] = None

class OptimizationConfig(BaseModel):
type: Optional[str] = "Optuna"
n_trials: Optional[int] = Field(default=10, description="Number of trials for optimization")
n_jobs: Optional[int] = Field(default=1, description="Number of jobs for optimization")
timeout: Optional[int] = Field(default=None, description="Timeout for optimization")
storage: Optional[str] = Field(default=None, description="Storage URL for Optuna (e.g., 'sqlite:///optuna.db')")
study_name: Optional[str] = Field(default=f"data_ingest_{int(time.time()*1000+random.randint(1, 1000))}", description="Name of the Optuna study")
load_if_exists: Optional[bool] = Field(default=False, description="Load existing study if it exists")

class BaseConfig(BaseModel):
input_source: Union[str, List[str]] = Field(..., description="File path, directory path, or URL for input data")
test_dataset: str = Field(..., description="Path to CSV file containing test questions")

@classmethod
def from_yaml(cls, file_path: str) -> 'DataIngestConfig':
"""
Load configuration from a YAML file.
"""
with open(file_path, 'r') as file:
config_dict = yaml.safe_load(file)
return cls(**config_dict)

def to_yaml(self, file_path: str) -> None:
"""
Save configuration to a YAML file.
"""
with open(file_path, 'w') as file:
yaml.dump(self.model_dump(), file)

class RetrieverOptionsConfig(BaseConfig):
document_loaders: Optional[List[LoaderConfig]] = Field(
default_factory=lambda: [LoaderConfig(type=ParserType.UNSTRUCTURED)],
description="Document loader configurations"
)
chunking_strategies: Optional[List[str]] = Field(
default_factory=lambda: [strategy for strategy in ChunkingStrategy if strategy != ChunkingStrategy.CUSTOM],
description="Chunking strategies to try"
)
custom_chunker: Optional[str] = Field(default=None, description="Custom chunker class. E.g., 'my_module.MyCustomChunker'")
chunk_size: Optional[ChunkSizeConfig] = Field(default_factory=ChunkSizeConfig, description="Chunk size configuration")
topk: Optional[List[int]] = Field(default=[100], description="topk values to try")
embedding_models: Optional[List[EmbeddingConfig]] = Field(
default_factory=lambda: [EmbeddingConfig(type=EmbeddingModel.HUGGINGFACE, model="sentence-transformers/all-MiniLM-L6-v2")],
description="List of embedding models"
)
vector_databases: Optional[List[VectorDBConfig]] = Field(
default_factory=lambda: [VectorDBConfig(type=VectorDatabase.FAISS, collection_name=None)],
description="List of vector databases"
)
top_k: Optional[int] = Field(default=5, description="Number of top results to consider for similarity scoring")
sampling_rate: Optional[float] = Field(default=None, description="Sampling rate for documents (0.0 to 1.0). None or 1.0 means no sampling.")
optimization: Optional[OptimizationConfig] = Field(default_factory=OptimizationConfig, description="Optimization configuration")

class DataIngestConfig(BaseConfig):
document_loader: LoaderConfig = Field(
default_factory=lambda: LoaderConfig(type=ParserType.UNSTRUCTURED),
description="Document loader configuration"
)
chunking_strategy: str = Field(default=ChunkingStrategy.RECURSIVE, description="Chunking strategy")
custom_chunker: Optional[str] = Field(default=None, description="Custom chunker class. E.g., 'my_module.MyCustomChunker'")
chunk_size: int = Field(default=1000, description="Chunk size")
chunk_overlap: int = Field(default=100, description="Chunk overlap")
embedding_model: EmbeddingConfig = Field(
default_factory=lambda: EmbeddingConfig(type=EmbeddingModel.HUGGINGFACE),
description="Embedding model configuration"
)
vector_database: VectorDBConfig = Field(
default_factory=lambda: VectorDBConfig(type=VectorDatabase.FAISS),
description="Vector store configuration"
)
top_k: int = Field(default=5, description="Number of top results to consider for similarity scoring")
sampling_rate: Optional[float] = Field(default=None, description="Sampling rate for documents (0.0 to 1.0). None or 1.0 means no sampling.")

def load_config(file_path: str) -> Union[DataIngestOptionsConfig, DataIngestConfig]:
with open(file_path, 'r') as file:
config_dict = yaml.safe_load(file)

# Check for required fields
if 'input_source' not in config_dict or 'test_dataset' not in config_dict:
raise ValueError("Configuration must include 'input_source' and 'test_data'")

# TODO: Re-think and redo this logic to see if there's a better way
try:
return DataIngestOptionsConfig(**config_dict)
except ValidationError:
# If it fails, try DataIngestConfig
try:
return DataIngestConfig(**config_dict)
except ValidationError as e:
raise ValueError(f"Invalid configuration: {str(e)}")

def save_config(config: Union[DataIngestOptionsConfig, DataIngestConfig], file_path: str) -> None:
"""
Save configuration to a YAML file.
"""
config.to_yaml(file_path)
24 changes: 24 additions & 0 deletions src/ragbuilder/retriever/evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from abc import ABC, abstractmethod
from .pipeline import DataIngestPipeline
from typing import List
import numpy as np


class Evaluator(ABC):
@abstractmethod
def evaluate(self, pipeline: DataIngestPipeline) -> float:
pass

class SimilarityEvaluator(Evaluator):
def __init__(self, test_dataset: str, top_k=5):
self.top_k = top_k
with open(test_dataset, 'r') as f:
self.test_questions = f.readlines()

def evaluate(self, pipeline: DataIngestPipeline) -> float:
total_score = 0
for question in self.test_questions:
results = pipeline.indexer.similarity_search_with_relevance_scores(question, k=self.top_k)
relevance_scores = [score for _, score in results]
total_score += np.mean(relevance_scores)
return total_score / len(self.test_questions)
150 changes: 150 additions & 0 deletions src/ragbuilder/retriever/optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import optuna
import logging
from dataclasses import dataclass
from typing import Optional
from .config import DataIngestOptionsConfig, DataIngestConfig
from .pipeline import DataIngestPipeline
from .evaluation import Evaluator, SimilarityEvaluator
from tqdm.notebook import tqdm

@dataclass
class OptimizerLogConfig:
"""Configuration for optimizer logging"""
log_level: int = logging.INFO
log_file: Optional[str] = None
show_progress_bar: bool = True
verbose: bool = False


class Optimizer:
def __init__(self, options_config: DataIngestOptionsConfig, evaluator: Evaluator, log_config: OptimizerLogConfig = OptimizerLogConfig()):
self.options_config = options_config
self.evaluator = evaluator
self.embedding_model_map = {i: model for i, model in enumerate(self.options_config.embedding_models)}
self.vector_db_map = {i: db for i, db in enumerate(self.options_config.vector_databases)}
self._setup_logging(log_config)

def _setup_logging(self, log_config: OptimizerLogConfig):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_config.log_level)

# Clear existing handlers
self.logger.handlers = []

# Console handler with formatter
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)

# File handler if specified
if log_config.log_file:
file_handler = logging.FileHandler(log_config.log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)

def optimize(self):
self.logger.info("Starting optimization process")
def objective(trial):
self.logger.info(f"Starting trial {trial.number + 1}/{self.options_config.optimization.n_trials}")

if len(self.options_config.chunking_strategies) == 1:
chunking_strategy = self.options_config.chunking_strategies[0]
else:
chunking_strategy = trial.suggest_categorical("chunking_strategy", self.options_config.chunking_strategies)
chunk_size = trial.suggest_int("chunk_size", self.options_config.chunk_size.min, self.options_config.chunk_size.max, step=self.options_config.chunk_size.stepsize)

if len(self.options_config.chunk_overlap) == 1:
chunk_overlap = self.options_config.chunk_overlap[0]
else:
chunk_overlap = trial.suggest_categorical("chunk_overlap", self.options_config.chunk_overlap)


if len(self.options_config.embedding_models) == 1:
embedding_model = self.options_config.embedding_models[0]
else:
embedding_model = self.embedding_model_map[trial.suggest_categorical("embedding_model_index", list(self.embedding_model_map.keys()))]

if len(self.options_config.vector_databases) == 1:
vector_database = self.options_config.vector_databases[0]
else:
vector_database = self.vector_db_map[trial.suggest_categorical("vector_database_index", list(self.vector_db_map.keys()))]

# Avoid the InvalidDimensionException by persisting to a unique directory for each trial
if vector_database.persist_directory:
self.original_persist_directory = vector_database.persist_directory if not hasattr(self, 'original_persist_directory') else self.original_persist_directory
vector_database.persist_directory = f"{self.original_persist_directory}/{trial.number}"

params = {
"input_source": self.options_config.input_source,
"test_dataset": self.options_config.test_dataset,
"chunking_strategy": chunking_strategy,
"chunk_overlap": chunk_overlap,
"chunk_size": chunk_size,
"embedding_model": embedding_model,
"vector_database": vector_database,
"top_k": self.options_config.top_k,
"sampling_rate": self.options_config.sampling_rate
}
self.logger.info(f"Trial parameters: {params}")

config = DataIngestConfig(**params)
self.logger.debug(f"Running pipeline with config: {config}")

pipeline = DataIngestPipeline(config)
index = pipeline.run()

score = self.evaluator.evaluate(pipeline)

return score

study = optuna.create_study(
storage=self.options_config.optimization.storage,
study_name=self.options_config.optimization.study_name,
load_if_exists=self.options_config.optimization.load_if_exists,
direction="maximize",
sampler=optuna.samplers.TPESampler(),
pruner=optuna.pruners.MedianPruner()
)

study.optimize(
objective,
n_trials=self.options_config.optimization.n_trials,
n_jobs=self.options_config.optimization.n_jobs,
timeout=self.options_config.optimization.timeout,
show_progress_bar=True
)

self.logger.info(f"Optimization completed. Best score: {study.best_value:.4f}")
self.logger.info(f"Best parameters: {study.best_params}")

best_config = DataIngestConfig(
input_source=self.options_config.input_source,
test_dataset=self.options_config.test_dataset,
chunking_strategy=study.best_params["chunking_strategy"] if "chunking_strategy" in study.best_params else self.options_config.chunking_strategies[0],
chunk_size=study.best_params["chunk_size"],
chunk_overlap=study.best_params["chunk_overlap"] if "chunk_overlap" in study.best_params else self.options_config.chunk_overlap[0],
embedding_model=self.embedding_model_map[study.best_params["embedding_model_index"]] if "embedding_model_index" in study.best_params else self.options_config.embedding_models[0],
vector_database=self.vector_db_map[study.best_params["vector_database_index"]] if "vector_database_index" in study.best_params else self.options_config.vector_databases[0],
top_k=self.options_config.top_k,
sampling_rate=self.options_config.sampling_rate
)
return best_config, study.best_value

def _run_optimization_core(options_config: DataIngestOptionsConfig):
evaluator = SimilarityEvaluator(options_config.test_dataset, options_config.top_k)
optimizer = Optimizer(options_config, evaluator)
best_config, best_score = optimizer.optimize()

best_pipeline = DataIngestPipeline(best_config)
best_index = best_pipeline.run()

return best_config, best_score, best_index

def run_optimization(options_config_path: str):
options_config = DataIngestOptionsConfig.from_yaml(options_config_path)
return _run_optimization_core(options_config)

def run_optimization_from_dict(options_config_dict: dict):
options_config = DataIngestOptionsConfig(**options_config_dict)
return _run_optimization_core(options_config)
Loading

0 comments on commit 2820e30

Please sign in to comment.