From f3aa4563042305d767b6adbaccc49db632d01939 Mon Sep 17 00:00:00 2001 From: berkecanrizai <63911408+berkecanrizai@users.noreply.github.com> Date: Mon, 13 Jan 2025 17:15:04 +0300 Subject: [PATCH] add example notebook with langgraph (#7320) Co-authored-by: Szymon Dudycz Co-authored-by: saksham65 <112824661+Saksham65@users.noreply.github.com> GitOrigin-RevId: 92f6acf4548f1702ac7e09978c6d898023c249fb --- .../pathway_deploy_langgraph_agents.ipynb | 1034 +++++++++++++++++ .../pathway_langgraph_agentic_rag.ipynb | 921 +++++++++++++++ 2 files changed, 1955 insertions(+) create mode 100644 cookbooks/self-rag-agents/pathway_deploy_langgraph_agents.ipynb create mode 100644 cookbooks/self-rag-agents/pathway_langgraph_agentic_rag.ipynb diff --git a/cookbooks/self-rag-agents/pathway_deploy_langgraph_agents.ipynb b/cookbooks/self-rag-agents/pathway_deploy_langgraph_agents.ipynb new file mode 100644 index 0000000..d2d168d --- /dev/null +++ b/cookbooks/self-rag-agents/pathway_deploy_langgraph_agents.ipynb @@ -0,0 +1,1034 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b3e94de1-ca3d-4956-b212-4a558b68062a", + "metadata": {}, + "source": [ + "# Power and Deploy LangGraph Agents with Pathway: A Complete Guide\n", + "This notebook demonstrates how to use Pathway to power and deploy LangGraph Agents.\n", + "\n", + "In this notebook, you will learn:\n", + "- How to build a Pathway RAG app that indexes documents from your data sources\n", + "- How to create a Hybrid Index that is powered by semantic search and BM25 index\n", + "- How to use the Pathway LangChain Retriever in a LangGraph Agent\n", + "- How to use Pathway to serve LangGraph Agents" + ] + }, + { + "cell_type": "markdown", + "id": "d27aa0d3-9b11-4678-8a83-b3865b26f30c", + "metadata": {}, + "source": [ + "# Setup\n", + "\n", + "Install the required packages and set your OpenAI API key. \n", + "OpenAI is used for embeddings during the indexing stage and to power the LangGraph agent." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "209efe9f-b9d3-4760-8863-0a6052617ebd", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -U \"pathway[all]\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b44f689e-0a62-4bbd-94e8-5bf31ece9cd9", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!pip install -U langgraph langchain-community langchainhub langchain-openai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3ff9da9f-66b8-4675-8c45-afa10e891462", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import json\n", + "from typing import Iterable, Literal, List\n", + "from pydantic import BaseModel, Field\n", + "\n", + "# needed for the OpenAI embedder and the LLM we will use below, you can change the embedding provider, see the documentation:\n", + "# https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders\n", + "os.environ[\"OPENAI_API_KEY\"] = \"sk-...\"" + ] + }, + { + "cell_type": "markdown", + "id": "571d4836-4d08-4ee5-97fa-37132ccc8ef9", + "metadata": {}, + "source": [ + "Lets define a `DATA_PATH` folder that will store the text files/documents for indexing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "54979252-13d3-4b94-b2ad-ec7b012aa320", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# folder that we will gather our docs in\n", + "DATA_PATH = \"./data\"\n", + "\n", + "os.makedirs(DATA_PATH, exist_ok=True)" + ] + }, + { + "cell_type": "markdown", + "id": "55c2fb67-c562-47f7-a08e-6a673b4b612b", + "metadata": {}, + "source": [ + "## Load & save a webpage\n", + "\n", + "Define utility functions to read content from a webpage and save it locally into `DATA_PATH`.\n", + " 1. `load_page_content`: Reads the raw text from a webpage using using a `WebBaseLoader` from `langchain_community.document_loaders`.\n", + " 2. `ingest_webpage`: Saves the loaded content as a text file into the `DATA_PATH`. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "64538684-5b2b-460a-b3a5-1fe737fd8269", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import re\n", + "from urllib.parse import urlparse\n", + "from langchain_community.document_loaders import WebBaseLoader\n", + "\n", + "\n", + "def load_page_content(url: str) -> str:\n", + " \"\"\"Load web page content with Langchain utilities.\"\"\"\n", + " return WebBaseLoader(url).load()[0].page_content\n", + "\n", + "\n", + "def ingest_webpage(url: str) -> None:\n", + " \"\"\"Save a webpage to local `DATA_PATH` folder.\"\"\"\n", + " text_content = load_page_content(url)\n", + "\n", + " parsed_url = urlparse(url)\n", + " file_name = parsed_url.hostname + parsed_url.path.replace(\"/\", \"_\") + \".txt\"\n", + "\n", + " with open(os.path.join(DATA_PATH, file_name), \"w\", encoding=\"utf-8\") as f:\n", + " f.write(text_content)" + ] + }, + { + "cell_type": "markdown", + "id": "5bf15463-7da1-4d04-9f6d-423975e377cb", + "metadata": { + "tags": [] + }, + "source": [ + "Download the Self-RAG paper" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fff7b382-2e25-415b-a3ee-06df976b3dbd", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ingest_webpage(\"https://arxiv.org/html/2310.11511\")" + ] + }, + { + "cell_type": "markdown", + "id": "62376b16-e5bc-463d-bf84-bec69a7c2035", + "metadata": { + "tags": [] + }, + "source": [ + "# Build the Pathway indexing pipeline\n", + "\n", + "Set up Pathway to read and index the documents saved under the `DATA_PATH`.\n", + "\n", + "\n", + "1. [Connectors](https://pathway.com/developers/user-guide/connect/pathway-connectors): Use Pathway’s file reader to ingest all text files under the `DATA_PATH`.\n", + "2. [Parsers](https://pathway.com/developers/api-docs/pathway-xpacks-llm/parsers): Utilize the ParseUnstructured to parse the documents. This parser supports multiple file types, including PDF, DOCX, and PPTX.\n", + "3. [Text Splitters](https://pathway.com/developers/api-docs/pathway-xpacks-llm/splitters): Split the document content into chunks.\n", + "4. [Embedders](https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders): Use OpenAI API for embeddings." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27c3d42a-30e1-47f8-bf3c-bd44d4a490f0", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# host and port of the RAG app\n", + "pathway_host: str = \"0.0.0.0\"\n", + "pathway_port: int = 8000" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "094ec4e1-e972-441b-9c47-ff0ce3c8d2ff", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import pathway as pw\n", + "from pathway.stdlib.indexing import BruteForceKnnFactory, HybridIndexFactory\n", + "from pathway.stdlib.indexing.bm25 import TantivyBM25Factory\n", + "from pathway.udfs import DiskCache\n", + "from pathway.xpacks.llm import embedders, llms, parsers, splitters\n", + "from pathway.xpacks.llm.document_store import DocumentStore\n", + "from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer, RAGClient\n", + "\n", + "\n", + "# read the text files under the data folder, we can also read from Google Drive, Sharepoint, etc.\n", + "# See connectors documentation: https://pathway.com/developers/user-guide/connect/pathway-connectors to learn more\n", + "folder = pw.io.fs.read(\n", + " path=f\"{DATA_PATH}/*.txt\",\n", + " format=\"binary\",\n", + " with_metadata=True,\n", + ")\n", + "\n", + "# list of data sources to be indexed\n", + "sources = [folder]\n", + "\n", + "# define the document processing steps\n", + "parser = parsers.ParseUnstructured()\n", + "\n", + "text_splitter = splitters.TokenCountSplitter(min_tokens=150, max_tokens=450)\n", + "\n", + "embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache())" + ] + }, + { + "cell_type": "markdown", + "id": "d3426ef8-90ee-4023-8094-5ce007d64b0e", + "metadata": {}, + "source": [ + "Hybrid index combines semantic search and keyword based BM25 search.\n", + "\n", + "[`HybridIndexFactory`](https://pathway.com/developers/api-docs/indexing#pathway.stdlib.indexing.HybridIndexFactory) combines different indexes to build an hybrid index:\n", + " 1. [BM25](https://pathway.com/developers/api-docs/indexing#pathway.stdlib.indexing.TantivyBM25) (via TantivyBM25Factory) → Keyword based BM25 search.\n", + " 2. [BruteForceKnn](https://pathway.com/developers/api-docs/indexing#pathway.stdlib.indexing.BruteForceKnn) → Vector-based semantic search" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a3f7c59b-925b-4ef3-88a8-89749479925e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "index = HybridIndexFactory(\n", + " [\n", + " TantivyBM25Factory(),\n", + " BruteForceKnnFactory(embedder=embedder),\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "8b55a446-cf97-4d47-b1ca-374bcf0e8a7b", + "metadata": {}, + "source": [ + "[DocumentStore](https://pathway.com/developers/api-docs/pathway-xpacks-llm/document_store#pathway.xpacks.llm.document_store.DocumentStore) manages document ingestion, parsing, splitting, and indexing.\n", + "\n", + "[BaseRAGQuestionAnswerer](https://pathway.com/developers/api-docs/pathway-xpacks-llm/question_answering#pathway.xpacks.llm.question_answering.BaseRAGQuestionAnswerer) creates a Pathway “RAG” application that:\n", + " * Indexes the documents (via `document_store`)\n", + " * Exposes a standard question-answering interface\n", + " * Enables us to create endpoint for the agent with Pathway." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7db0ab59-a641-43cc-901b-cc2713f03a99", + "metadata": {}, + "outputs": [], + "source": [ + "llm = llms.OpenAIChat(model=\"gpt-4o-mini\", cache_strategy=DiskCache())\n", + "\n", + "document_store = DocumentStore(\n", + " docs=sources, parser=parser, splitter=text_splitter, retriever_factory=index\n", + ")\n", + "\n", + "# create the RAG app that will power the index, and serve the agent endpoint\n", + "rag_app = BaseRAGQuestionAnswerer(\n", + " llm=llm,\n", + " indexer=document_store,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "a027e277-fba8-4198-863b-2168ae6f8505", + "metadata": {}, + "source": [ + "# Create the LangGraph agent" + ] + }, + { + "cell_type": "markdown", + "id": "cf15141f-9612-4d0d-a21b-11bd8530686a", + "metadata": {}, + "source": [ + "### Create the Langchain Retriever\n", + "\n", + "You can now query Pathway and access up-to-date documents for your RAG applications from LangChain using [PathwayVectorClient](https://api.python.langchain.com/en/latest/vectorstores/langchain_community.vectorstores.pathway.PathwayVectorClient.html)\n", + "\n", + "The Langchain Retriever is a wrapper around the Pathway client, you will use it in `retrieve` part stage of the Agent" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fefffab6-d5a3-4925-a52d-5a5c574421ea", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain_community.vectorstores import PathwayVectorClient" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "51d365aa-5526-4ec8-932d-e65e83ee355d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "vectorstore_client = PathwayVectorClient(pathway_host, pathway_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59624377-8481-4407-8ee3-0f014bdff30e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# this will not be able to fetch data until the RAG app is running\n", + "retriever = vectorstore_client.as_retriever()" + ] + }, + { + "attachments": { + "76895b7a-fcc5-4758-9fbb-510b17fdeda4.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "id": "5894115c-0323-4098-83be-da49d949a53c", + "metadata": {}, + "source": [ + "### Self-RAG\n", + "\n", + "Self-RAG is a strategy for RAG that incorporates self-reflection / self-grading on retrieved documents and generations.\n", + "\n", + "In the [paper](https://arxiv.org/abs/2310.11511), a few decisions are made:\n", + "\n", + "1. Should I retrieve from retriever, `R` -\n", + "\n", + "* Input: `x (question)` OR `x (question)`, `y (generation)`\n", + "* Decides when to retrieve `D` chunks with `R`\n", + "* Output: `yes, no, continue`\n", + "\n", + "2. Are the retrieved passages `D` relevant to the question `x` -\n", + "\n", + "* * Input: (`x (question)`, `d (chunk)`) for `d` in `D`\n", + "* `d` provides useful information to solve `x`\n", + "* Output: `relevant, irrelevant`\n", + "\n", + "3. Are the LLM generation from each chunk in `D` is relevant to the chunk (hallucinations, etc) -\n", + "\n", + "* Input: `x (question)`, `d (chunk)`, `y (generation)` for `d` in `D`\n", + "* All of the verification-worthy statements in `y (generation)` are supported by `d`\n", + "* Output: `{fully supported, partially supported, no support`\n", + "\n", + "4. The LLM generation from each chunk in `D` is a useful response to `x (question)` -\n", + "\n", + "* Input: `x (question)`, `y (generation)` for `d` in `D`\n", + "* `y (generation)` is a useful response to `x (question)`.\n", + "* Output: `{5, 4, 3, 2, 1}`\n", + "\n", + "We will implement some of these ideas from scratch using [LangGraph](https://langchain-ai.github.io/langgraph/).\n", + "![image.png](attachment:76895b7a-fcc5-4758-9fbb-510b17fdeda4.png)" + ] + }, + { + "cell_type": "markdown", + "id": "1def042d-abcc-47af-8926-a9d1e2e8154f", + "metadata": {}, + "source": [ + "### Define the LangGraph Workflow\n", + "\n", + "\n", + "The LangGraph agent operates as a multi-step workflow.\n", + "\n", + "- It retrieves documents, grades their relevance, generates an answer, and ensures factual accuracy (e.g., no hallucinations).\n", + "- You can also implement self-reflection and query optimization using LangGraph nodes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c4c9be8-396c-4ebc-a9e4-d4e11504c05c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain_core.prompts import ChatPromptTemplate\n", + "from langchain_core.pydantic_v1 import BaseModel, Field\n", + "from langchain_openai import ChatOpenAI\n", + "\n", + "\n", + "# Data model\n", + "class GradeDocuments(BaseModel):\n", + " \"\"\"Binary score for relevance check on retrieved documents.\"\"\"\n", + "\n", + " binary_score: str = Field(\n", + " description=\"Documents are relevant to the question, 'yes' or 'no'\"\n", + " )\n", + "\n", + "\n", + "# LLM with function call\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "structured_llm_grader = llm.with_structured_output(GradeDocuments)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You are a grader assessing relevance of a retrieved document to a user question. \\n \n", + " It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \\n\n", + " If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \\n\n", + " Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.\"\"\"\n", + "grade_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\"human\", \"Retrieved document: \\n\\n {document} \\n\\n User question: {question}\"),\n", + " ]\n", + ")\n", + "\n", + "retrieval_grader = grade_prompt | structured_llm_grader" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "499173a5-8df0-4fe6-bb46-552ce496b8ba", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain import hub\n", + "from langchain_core.output_parsers import StrOutputParser\n", + "\n", + "# Prompt\n", + "prompt = hub.pull(\"rlm/rag-prompt\")\n", + "\n", + "# LLM\n", + "llm = ChatOpenAI(model_name=\"gpt-3.5-turbo\", temperature=0)\n", + "\n", + "\n", + "# Post-processing\n", + "def format_docs(docs):\n", + " return \"\\n\\n\".join(doc.page_content for doc in docs)\n", + "\n", + "\n", + "# Chain\n", + "rag_chain = prompt | llm | StrOutputParser()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d9055d3b-8e9b-494f-bf4f-d02c153639c7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "class GradeHallucinations(BaseModel):\n", + " \"\"\"Binary score for hallucination present in generation answer.\"\"\"\n", + "\n", + " binary_score: str = Field(\n", + " description=\"Answer is grounded in the facts, 'yes' or 'no'\"\n", + " )\n", + "\n", + "\n", + "# LLM with function call\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "structured_llm_grader = llm.with_structured_output(GradeHallucinations)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \\n \n", + " Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.\"\"\"\n", + "hallucination_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\"human\", \"Set of facts: \\n\\n {documents} \\n\\n LLM generation: {generation}\"),\n", + " ]\n", + ")\n", + "\n", + "hallucination_grader = hallucination_prompt | structured_llm_grader" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f92d351d-5ee1-48cc-ad8f-9b57ceed18ed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "class GradeAnswer(BaseModel):\n", + " \"\"\"Binary score to assess answer addresses question.\"\"\"\n", + "\n", + " binary_score: str = Field(\n", + " description=\"Answer addresses the question, 'yes' or 'no'\"\n", + " )\n", + "\n", + "\n", + "# LLM with function call\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "structured_llm_grader = llm.with_structured_output(GradeAnswer)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You are a grader assessing whether an answer addresses / resolves a question \\n \n", + " Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question.\"\"\"\n", + "answer_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\"human\", \"User question: \\n\\n {question} \\n\\n LLM generation: {generation}\"),\n", + " ]\n", + ")\n", + "\n", + "answer_grader = answer_prompt | structured_llm_grader" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "286a9a2a-2a62-4004-84a2-fc82e6996cf1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You a question re-writer that converts an input question to a better version that is optimized \\n \n", + " for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning.\"\"\"\n", + "re_write_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\n", + " \"human\",\n", + " \"Here is the initial question: \\n\\n {question} \\n Formulate an improved question.\",\n", + " ),\n", + " ]\n", + ")\n", + "\n", + "question_rewriter = re_write_prompt | llm | StrOutputParser()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb9bd597-a683-433a-879d-7991cfed5cf7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from typing import List\n", + "\n", + "from typing_extensions import TypedDict\n", + "\n", + "\n", + "class GraphState(TypedDict):\n", + " \"\"\"\n", + " Represents the state of our graph.\n", + "\n", + " Attributes:\n", + " question: question\n", + " generation: LLM generation\n", + " documents: list of documents\n", + " \"\"\"\n", + "\n", + " question: str\n", + " generation: str\n", + " documents: List[str]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "398e5cc2-9c41-465d-a4f4-93c82ed452f2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def retrieve(state):\n", + " \"\"\"\n", + " Retrieve documents\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): New key added to state, documents, that contains retrieved documents\n", + " \"\"\"\n", + " print(\"---RETRIEVE---\")\n", + " question = state[\"question\"]\n", + "\n", + " # Retrieval\n", + " documents = retriever.get_relevant_documents(question)\n", + " return {\"documents\": documents, \"question\": question}\n", + "\n", + "\n", + "def generate(state):\n", + " \"\"\"\n", + " Generate answer\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): New key added to state, generation, that contains LLM generation\n", + " \"\"\"\n", + " print(\"---GENERATE---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + "\n", + " # RAG generation\n", + " generation = rag_chain.invoke({\"context\": documents, \"question\": question})\n", + " return {\"documents\": documents, \"question\": question, \"generation\": generation}\n", + "\n", + "\n", + "def grade_documents(state):\n", + " \"\"\"\n", + " Determines whether the retrieved documents are relevant to the question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): Updates documents key with only filtered relevant documents\n", + " \"\"\"\n", + "\n", + " print(\"---CHECK DOCUMENT RELEVANCE TO QUESTION---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + "\n", + " # Score each doc\n", + " filtered_docs = []\n", + " for d in documents:\n", + " score = retrieval_grader.invoke(\n", + " {\"question\": question, \"document\": d.page_content}\n", + " )\n", + " grade = score.binary_score\n", + " if grade == \"yes\":\n", + " print(\"---GRADE: DOCUMENT RELEVANT---\")\n", + " filtered_docs.append(d)\n", + " else:\n", + " print(\"---GRADE: DOCUMENT NOT RELEVANT---\")\n", + " continue\n", + " return {\"documents\": filtered_docs, \"question\": question}\n", + "\n", + "\n", + "def transform_query(state):\n", + " \"\"\"\n", + " Transform the query to produce a better question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): Updates question key with a re-phrased question\n", + " \"\"\"\n", + "\n", + " print(\"---TRANSFORM QUERY---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + "\n", + " # Re-write question\n", + " better_question = question_rewriter.invoke({\"question\": question})\n", + " return {\"documents\": documents, \"question\": better_question}\n", + "\n", + "\n", + "### Edges\n", + "\n", + "\n", + "def decide_to_generate(state):\n", + " \"\"\"\n", + " Determines whether to generate an answer, or re-generate a question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " str: Binary decision for next node to call\n", + " \"\"\"\n", + "\n", + " print(\"---ASSESS GRADED DOCUMENTS---\")\n", + " state[\"question\"]\n", + " filtered_documents = state[\"documents\"]\n", + "\n", + " if not filtered_documents:\n", + " # All documents have been filtered check_relevance\n", + " # We will re-generate a new query\n", + " print(\n", + " \"---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---\"\n", + " )\n", + " return \"transform_query\"\n", + " else:\n", + " # We have relevant documents, so generate answer\n", + " print(\"---DECISION: GENERATE---\")\n", + " return \"generate\"\n", + "\n", + "\n", + "def grade_generation_v_documents_and_question(state):\n", + " \"\"\"\n", + " Determines whether the generation is grounded in the document and answers question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " str: Decision for next node to call\n", + " \"\"\"\n", + "\n", + " print(\"---CHECK HALLUCINATIONS---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + " generation = state[\"generation\"]\n", + "\n", + " score = hallucination_grader.invoke(\n", + " {\"documents\": documents, \"generation\": generation}\n", + " )\n", + " grade = score.binary_score\n", + "\n", + " # Check hallucination\n", + " if grade == \"yes\":\n", + " print(\"---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---\")\n", + " # Check question-answering\n", + " print(\"---GRADE GENERATION vs QUESTION---\")\n", + " score = answer_grader.invoke({\"question\": question, \"generation\": generation})\n", + " grade = score.binary_score\n", + " if grade == \"yes\":\n", + " print(\"---DECISION: GENERATION ADDRESSES QUESTION---\")\n", + " return \"useful\"\n", + " else:\n", + " print(\"---DECISION: GENERATION DOES NOT ADDRESS QUESTION---\")\n", + " return \"not useful\"\n", + " else:\n", + " pprint(\"---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---\")\n", + " return \"not supported\"" + ] + }, + { + "cell_type": "markdown", + "id": "b77061d5-fa7f-4dbc-ad5b-cc409b931bf4", + "metadata": {}, + "source": [ + "### Compose the graph" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5ae35f7-ff5c-40f1-9b0e-1a8bac0dc58f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langgraph.graph import END, StateGraph, START\n", + "\n", + "workflow = StateGraph(GraphState)\n", + "\n", + "# Define the nodes\n", + "workflow.add_node(\"retrieve\", retrieve) # retrieve\n", + "workflow.add_node(\"grade_documents\", grade_documents) # grade documents\n", + "workflow.add_node(\"generate\", generate) # generatae\n", + "workflow.add_node(\"transform_query\", transform_query) # transform_query\n", + "\n", + "# Build graph\n", + "workflow.add_edge(START, \"retrieve\")\n", + "workflow.add_edge(\"retrieve\", \"grade_documents\")\n", + "workflow.add_conditional_edges(\n", + " \"grade_documents\",\n", + " decide_to_generate,\n", + " {\n", + " \"transform_query\": \"transform_query\",\n", + " \"generate\": \"generate\",\n", + " },\n", + ")\n", + "workflow.add_edge(\"transform_query\", \"retrieve\")\n", + "workflow.add_conditional_edges(\n", + " \"generate\",\n", + " grade_generation_v_documents_and_question,\n", + " {\n", + " \"not supported\": \"generate\",\n", + " \"useful\": END,\n", + " \"not useful\": \"transform_query\",\n", + " },\n", + ")\n", + "\n", + "# Compile\n", + "app = workflow.compile()" + ] + }, + { + "cell_type": "markdown", + "id": "598a9ef4-cfb2-479a-9a74-b82c5781daca", + "metadata": {}, + "source": [ + "# Serve the agent\n", + "\n", + "`BaseRAGQuestionAnswerer` provides `serve_callable` that lets you attach a custom Python function to an endpoint. This way, you can create your own endpoints on the Pathway server in addition to the [built-in ones](https://pathway.com/developers/ai-pipelines/rest-api).\n", + "\n", + "Use the `serve_callable` method to expose the LangGraph agent as an HTTP endpoint.\n", + "The `/agent` endpoint accepts user queries, processes them through the LangGraph pipeline, and returns the final answer." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fafc447a-c221-4251-ad40-c56cb77c338c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "@rag_app.serve_callable(route=\"/agent\")\n", + "async def call_agent(user_query: str) -> str:\n", + " langgraph_input = {\"question\": user_query}\n", + " result = app.invoke(langgraph_input)\n", + " return result[\"generation\"]" + ] + }, + { + "cell_type": "markdown", + "id": "a71d3943-faf8-4f74-9f9d-a8f914db2acb", + "metadata": {}, + "source": [ + "The above Pathway endpoint,\n", + "* Takes a `user_query` as the JSON input.\n", + "* Passes it into the compiled LangGraph pipeline.\n", + "* Returns the final answer from the state (result[\"generation\"])" + ] + }, + { + "cell_type": "markdown", + "id": "6e86ac7c-db40-4636-b704-77df7c0d18a6", + "metadata": {}, + "source": [ + "# Build and Run the Pathway server" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "963e891e-4ccf-4186-a3f2-19997b44bb86", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# This creates and runs the index, also serves the agent endpoint defined above\n", + "\n", + "rag_app.build_server(pathway_host, pathway_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "065f68f6-99b9-4d5e-8abf-796fdf67c050", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "t = rag_app.run_server(threaded=True)" + ] + }, + { + "cell_type": "markdown", + "id": "95b9772b-42ce-422f-bd40-56ba317a8249", + "metadata": {}, + "source": [ + "List the indexed documents.\n", + "\n", + "This request may take a while since app is parsing the documents and building the index." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8bc8ab3c-cfb6-41e6-a102-281caa1bb19e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "pathway_client = RAGClient(pathway_host, pathway_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aebfab0c-c827-4a47-a5b4-36361c2c68c1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "pathway_client.pw_list_documents()" + ] + }, + { + "cell_type": "markdown", + "id": "1e682805-8d61-4054-aab0-47044d921157", + "metadata": {}, + "source": [ + "# Run with a sample question\n", + "\n", + "Test the `/agent` endpoint by sending a sample question. The server runs the entire RAG pipeline and returns the answer." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4676f4c8-26bd-4f36-aadf-80771b001b59", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "question: str = (\n", + " \"How does self-RAG work? Explain the steps involved in the implementation.\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b84144d8-e266-4a13-a515-a55fcdc02f11", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import requests\n", + "\n", + "# send request to the endpoint we defined above, put the expected fields as payload\n", + "response = requests.post(\n", + " pathway_client.url + \"/agent\",\n", + " json={\"user_query\": question},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a5689309-1780-4725-8d3f-364841303def", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "response.json()" + ] + }, + { + "cell_type": "markdown", + "id": "35051a4c-1c70-4631-9005-5a06520640e1", + "metadata": { + "tags": [] + }, + "source": [ + "## Summary\n", + "\n", + "1. **Data Ingestion:** Ingest a webpage (the Self-RAG arxiv page).\n", + "2. **Indexing:** Pathway reads these text documents, parses, splits, and indexes them using a hybrid BM25 + semantic index.\n", + "3. **LangGraph Agent:** Define a multi-step agent that:\n", + " * Re-writes the query if needed.\n", + " * Retrieves documents via the Pathway VectorStore.\n", + " * Grades those docs for relevance.\n", + " * Generates an answer.\n", + " * Grades that answer for hallucination and for how well it addresses the user’s question.\n", + " * Loops as necessary until it gets a satisfactory final response.\n", + "4. **Serving:** Pathway’s built-in server is launched, hosting an `/agent` endpoint for queries.\n", + "\n", + "\n", + "Thus, whenever you POST a `\"user_query\"` to the `/agent` endpoint, the agent pipeline runs, and returns the final answer." + ] + }, + { + "cell_type": "markdown", + "id": "512f527d-400e-44af-add3-30a1b5a3c844", + "metadata": {}, + "source": [ + "> This notebook was inspired by this [LangGraph cookbook](https://github.com/langchain-ai/langgraph/blob/e3ca7bb3e9d34b09633852f4d08d55f6dcd4364b/examples/rag/langgraph_self_rag.ipynb)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/cookbooks/self-rag-agents/pathway_langgraph_agentic_rag.ipynb b/cookbooks/self-rag-agents/pathway_langgraph_agentic_rag.ipynb new file mode 100644 index 0000000..003c47f --- /dev/null +++ b/cookbooks/self-rag-agents/pathway_langgraph_agentic_rag.ipynb @@ -0,0 +1,921 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9b009d03-9465-4b1a-b64d-50a05c0dfbc5", + "metadata": {}, + "source": [ + "# LangGraph Agents Powered by Pathway: A Complete Guide\n", + "\n", + "This notebook demonstrates how to use Pathway VectorStore as a LangChain Retriever to power LangGraph Agents that are synced with your documents.\n", + "\n", + "In this notebook, you will learn:\n", + "- How to build a Pathway vector store that indexes documents from your data sources\n", + "- How to create a vector index that enables semantic search through your documents\n", + "- How to use the Pathway LangChain Retriever in a LangGraph Agent" + ] + }, + { + "cell_type": "markdown", + "id": "727099e5-e1d8-4f20-8240-d74ec1299ddf", + "metadata": {}, + "source": [ + "# Setup\n", + "\n", + "Install the required packages and set your OpenAI API key. \n", + "OpenAI is used for embeddings during the indexing stage and to power the LangGraph agent." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7bdea4ea-882e-4043-8cda-4671690b33a1", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -U \"pathway[all]\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "88ec0f0e-7fa2-457f-a3d7-49a158653e24", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "!pip install -U langgraph langchain-community langchainhub langchain-openai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3ff9da9f-66b8-4675-8c45-afa10e891462", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import json\n", + "from typing import Iterable, Literal, List\n", + "from pydantic import BaseModel, Field\n", + "\n", + "# needed for the OpenAI embedder and the LLM we will use below, you can change the embedding provider, see the documentation:\n", + "# https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders\n", + "os.environ[\"OPENAI_API_KEY\"] = \"sk-...\"" + ] + }, + { + "cell_type": "markdown", + "id": "5b04f68d-2ca4-43e5-9c8d-c8392dd65edb", + "metadata": {}, + "source": [ + "Lets define a `DATA_PATH` folder that will store the text files/documents for indexing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "54979252-13d3-4b94-b2ad-ec7b012aa320", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# folder that we will gather our docs in\n", + "DATA_PATH = \"./data\"\n", + "\n", + "os.makedirs(DATA_PATH, exist_ok=True)" + ] + }, + { + "cell_type": "markdown", + "id": "8169934b-9a2c-433e-b034-8a97ead8d25b", + "metadata": {}, + "source": [ + "## Load & save a webpage\n", + "\n", + "Define utility functions to read content from a webpage and save it locally into `DATA_PATH`.\n", + " 1. `load_page_content`: Reads the raw text from a webpage using using a `WebBaseLoader` from `langchain_community.document_loaders`.\n", + " 2. `ingest_webpage`: Saves the loaded content as a text file into the `DATA_PATH`. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "64538684-5b2b-460a-b3a5-1fe737fd8269", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import re\n", + "from urllib.parse import urlparse\n", + "from langchain_community.document_loaders import WebBaseLoader\n", + "\n", + "\n", + "def load_page_content(url: str) -> str:\n", + " \"\"\"Load web page content with Langchain utilities.\"\"\"\n", + " return WebBaseLoader(url).load()[0].page_content\n", + "\n", + "\n", + "def ingest_webpage(url: str) -> None:\n", + " \"\"\"Save a webpage to local `DATA_PATH` folder.\"\"\"\n", + " text_content = load_page_content(url)\n", + "\n", + " parsed_url = urlparse(url)\n", + " file_name = parsed_url.hostname + parsed_url.path.replace(\"/\", \"_\") + \".txt\"\n", + "\n", + " with open(os.path.join(DATA_PATH, file_name), \"w\", encoding=\"utf-8\") as f:\n", + " f.write(text_content)" + ] + }, + { + "cell_type": "markdown", + "id": "5bf15463-7da1-4d04-9f6d-423975e377cb", + "metadata": { + "tags": [] + }, + "source": [ + "Download the Self-RAG paper" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fff7b382-2e25-415b-a3ee-06df976b3dbd", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "ingest_webpage(\"https://arxiv.org/html/2310.11511\")" + ] + }, + { + "cell_type": "markdown", + "id": "f7e2f21e-2259-49f8-b295-0f442e35c337", + "metadata": { + "tags": [] + }, + "source": [ + "# Build the Pathway indexing pipeline\n", + "\n", + "Set up Pathway to read and index the documents saved under the `DATA_PATH`.\n", + "\n", + "\n", + "1. [Connectors](https://pathway.com/developers/user-guide/connect/pathway-connectors): Use Pathway’s file reader to ingest all text files under the `DATA_PATH`.\n", + "2. [Parsers](https://pathway.com/developers/api-docs/pathway-xpacks-llm/parsers): Utilize the ParseUnstructured to parse the documents. This parser supports multiple file types, including PDF, DOCX, and PPTX.\n", + "3. [Text Splitters](https://pathway.com/developers/api-docs/pathway-xpacks-llm/splitters): Split the document content into chunks.\n", + "4. [Embedders](https://pathway.com/developers/api-docs/pathway-xpacks-llm/embedders): Use OpenAI API for embeddings.\n", + "5. [VectorStore](https://pathway.com/developers/api-docs/pathway-xpacks-llm/vectorstore): Orchestrates all the above modules." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27c3d42a-30e1-47f8-bf3c-bd44d4a490f0", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# host and port of the RAG app\n", + "pathway_host: str = \"0.0.0.0\"\n", + "pathway_port: int = 8000" + ] + }, + { + "cell_type": "markdown", + "id": "3c68a48a-8263-4b52-8c5a-206baefd22d1", + "metadata": {}, + "source": [ + "Create the vector store that will power the index of our RAG application. \n", + "\n", + "We use the modules under the Pathway Xpacks to create a suitable vector store." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "094ec4e1-e972-441b-9c47-ff0ce3c8d2ff", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import pathway as pw\n", + "\n", + "from pathway.xpacks.llm.vector_store import VectorStoreServer, VectorStoreClient\n", + "from pathway.xpacks.llm import (\n", + " embedders,\n", + " llms,\n", + " parsers,\n", + " splitters,\n", + ")\n", + "from pathway.udfs import DiskCache\n", + "\n", + "# read the text files under the data folder, we can also read from Google Drive, Sharepoint, etc.\n", + "# See connectors documentation: https://pathway.com/developers/user-guide/connect/pathway-connectors to learn more\n", + "folder = pw.io.fs.read(\n", + " path=f\"{DATA_PATH}/*.txt\",\n", + " format=\"binary\",\n", + " with_metadata=True,\n", + ")\n", + "\n", + "# list of data sources to be indexed\n", + "sources = [folder]\n", + "\n", + "# define the document processing steps\n", + "parser = parsers.ParseUnstructured()\n", + "\n", + "text_splitter = splitters.TokenCountSplitter(min_tokens=150, max_tokens=450)\n", + "\n", + "embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache())\n", + "\n", + "vector_server = VectorStoreServer(\n", + " *sources,\n", + " embedder=embedder,\n", + " splitter=text_splitter,\n", + " parser=parser,\n", + ")\n", + "\n", + "# deploy the vector store locally\n", + "t = vector_server.run_server(pathway_host, pathway_port, threaded=True)" + ] + }, + { + "cell_type": "markdown", + "id": "34cc8823-e184-47c0-aa09-7b4035817e7d", + "metadata": {}, + "source": [ + "[VectorStore](https://pathway.com/developers/api-docs/pathway-xpacks-llm/vectorstore) manages document ingestion, parsing, splitting, and indexing." + ] + }, + { + "cell_type": "markdown", + "id": "b2742133-a7ef-49a8-82d1-093c2d2ff2d2", + "metadata": {}, + "source": [ + "List the indexed documents" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8bc8ab3c-cfb6-41e6-a102-281caa1bb19e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "pathway_client = VectorStoreClient(pathway_host, pathway_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aebfab0c-c827-4a47-a5b4-36361c2c68c1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "pathway_client.get_input_files()" + ] + }, + { + "cell_type": "markdown", + "id": "ba734b24-63bd-4b22-a3cb-fed2393a60f4", + "metadata": {}, + "source": [ + "# Create the LangGraph agent" + ] + }, + { + "cell_type": "markdown", + "id": "cf15141f-9612-4d0d-a21b-11bd8530686a", + "metadata": {}, + "source": [ + "### Create the Langchain Retriever\n", + "\n", + "You can now query Pathway and access up-to-date documents for your RAG applications from LangChain using [PathwayVectorClient](https://api.python.langchain.com/en/latest/vectorstores/langchain_community.vectorstores.pathway.PathwayVectorClient.html)\n", + "\n", + "The Langchain Retriever is a wrapper around the Pathway client, you will use it in `retrieve` part stage of the Agent" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fefffab6-d5a3-4925-a52d-5a5c574421ea", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain_community.vectorstores import PathwayVectorClient" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "51d365aa-5526-4ec8-932d-e65e83ee355d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "vectorstore_client = PathwayVectorClient(pathway_host, pathway_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "59624377-8481-4407-8ee3-0f014bdff30e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "retriever = vectorstore_client.as_retriever()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4623f53b-013b-4bc5-8c7c-e994a2e35c57", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# test the retriever\n", + "# retriever.invoke({\"question\": \"evaluation\"})" + ] + }, + { + "attachments": { + "ac58a69e-7bf3-481f-baea-1b9d6c6eba00.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "id": "7a3c29d5-7f69-4186-8bfd-d03b8cbee280", + "metadata": {}, + "source": [ + "### Self-RAG\n", + "\n", + "Self-RAG is a strategy for RAG that incorporates self-reflection / self-grading on retrieved documents and generations.\n", + "\n", + "In the [paper](https://arxiv.org/abs/2310.11511), a few decisions are made:\n", + "\n", + "1. Should I retrieve from retriever, `R` -\n", + "\n", + "* Input: `x (question)` OR `x (question)`, `y (generation)`\n", + "* Decides when to retrieve `D` chunks with `R`\n", + "* Output: `yes, no, continue`\n", + "\n", + "2. Are the retrieved passages `D` relevant to the question `x` -\n", + "\n", + "* * Input: (`x (question)`, `d (chunk)`) for `d` in `D`\n", + "* `d` provides useful information to solve `x`\n", + "* Output: `relevant, irrelevant`\n", + "\n", + "3. Are the LLM generation from each chunk in `D` is relevant to the chunk (hallucinations, etc) -\n", + "\n", + "* Input: `x (question)`, `d (chunk)`, `y (generation)` for `d` in `D`\n", + "* All of the verification-worthy statements in `y (generation)` are supported by `d`\n", + "* Output: `{fully supported, partially supported, no support`\n", + "\n", + "4. The LLM generation from each chunk in `D` is a useful response to `x (question)` -\n", + "\n", + "* Input: `x (question)`, `y (generation)` for `d` in `D`\n", + "* `y (generation)` is a useful response to `x (question)`.\n", + "* Output: `{5, 4, 3, 2, 1}`\n", + "\n", + "We will implement some of these ideas from scratch using [LangGraph](https://langchain-ai.github.io/langgraph/).![image.png](attachment:ac58a69e-7bf3-481f-baea-1b9d6c6eba00.png)" + ] + }, + { + "cell_type": "markdown", + "id": "4244331f-1cdc-4548-b577-f758d1cd2bf9", + "metadata": {}, + "source": [ + "### Define the LangGraph Workflow\n", + "\n", + "\n", + "The LangGraph agent operates as a multi-step workflow.\n", + "\n", + "- It retrieves documents, grades their relevance, generates an answer, and ensures factual accuracy (e.g., no hallucinations).\n", + "- You can also implement self-reflection and query optimization using LangGraph nodes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c4c9be8-396c-4ebc-a9e4-d4e11504c05c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain_core.prompts import ChatPromptTemplate\n", + "from langchain_core.pydantic_v1 import BaseModel, Field\n", + "from langchain_openai import ChatOpenAI\n", + "\n", + "\n", + "# Data model\n", + "class GradeDocuments(BaseModel):\n", + " \"\"\"Binary score for relevance check on retrieved documents.\"\"\"\n", + "\n", + " binary_score: str = Field(\n", + " description=\"Documents are relevant to the question, 'yes' or 'no'\"\n", + " )\n", + "\n", + "\n", + "# LLM with function call\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "structured_llm_grader = llm.with_structured_output(GradeDocuments)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You are a grader assessing relevance of a retrieved document to a user question. \\n \n", + " It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \\n\n", + " If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \\n\n", + " Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.\"\"\"\n", + "grade_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\"human\", \"Retrieved document: \\n\\n {document} \\n\\n User question: {question}\"),\n", + " ]\n", + ")\n", + "\n", + "retrieval_grader = grade_prompt | structured_llm_grader\n", + "\n", + "\n", + "question = \"self rag performance\"\n", + "docs = retriever.get_relevant_documents(question)\n", + "doc_txt = docs[1].page_content\n", + "print(retrieval_grader.invoke({\"question\": question, \"document\": doc_txt}))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "499173a5-8df0-4fe6-bb46-552ce496b8ba", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langchain import hub\n", + "from langchain_core.output_parsers import StrOutputParser\n", + "\n", + "# Prompt\n", + "prompt = hub.pull(\"rlm/rag-prompt\")\n", + "\n", + "# LLM\n", + "llm = ChatOpenAI(model_name=\"gpt-3.5-turbo\", temperature=0)\n", + "\n", + "\n", + "# Post-processing\n", + "def format_docs(docs):\n", + " return \"\\n\\n\".join(doc.page_content for doc in docs)\n", + "\n", + "\n", + "# Chain\n", + "rag_chain = prompt | llm | StrOutputParser()\n", + "\n", + "# Run\n", + "generation = rag_chain.invoke({\"context\": docs, \"question\": question})\n", + "print(generation)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d9055d3b-8e9b-494f-bf4f-d02c153639c7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "class GradeHallucinations(BaseModel):\n", + " \"\"\"Binary score for hallucination present in generation answer.\"\"\"\n", + "\n", + " binary_score: str = Field(\n", + " description=\"Answer is grounded in the facts, 'yes' or 'no'\"\n", + " )\n", + "\n", + "\n", + "# LLM with function call\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "structured_llm_grader = llm.with_structured_output(GradeHallucinations)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \\n \n", + " Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.\"\"\"\n", + "hallucination_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\"human\", \"Set of facts: \\n\\n {documents} \\n\\n LLM generation: {generation}\"),\n", + " ]\n", + ")\n", + "\n", + "hallucination_grader = hallucination_prompt | structured_llm_grader\n", + "hallucination_grader.invoke({\"documents\": docs, \"generation\": generation})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f92d351d-5ee1-48cc-ad8f-9b57ceed18ed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "class GradeAnswer(BaseModel):\n", + " \"\"\"Binary score to assess answer addresses question.\"\"\"\n", + "\n", + " binary_score: str = Field(\n", + " description=\"Answer addresses the question, 'yes' or 'no'\"\n", + " )\n", + "\n", + "\n", + "# LLM with function call\n", + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "structured_llm_grader = llm.with_structured_output(GradeAnswer)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You are a grader assessing whether an answer addresses / resolves a question \\n \n", + " Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question.\"\"\"\n", + "answer_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\"human\", \"User question: \\n\\n {question} \\n\\n LLM generation: {generation}\"),\n", + " ]\n", + ")\n", + "\n", + "answer_grader = answer_prompt | structured_llm_grader\n", + "answer_grader.invoke({\"question\": question, \"generation\": generation})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "286a9a2a-2a62-4004-84a2-fc82e6996cf1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "llm = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0)\n", + "\n", + "# Prompt\n", + "system = \"\"\"You a question re-writer that converts an input question to a better version that is optimized \\n \n", + " for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning.\"\"\"\n", + "re_write_prompt = ChatPromptTemplate.from_messages(\n", + " [\n", + " (\"system\", system),\n", + " (\n", + " \"human\",\n", + " \"Here is the initial question: \\n\\n {question} \\n Formulate an improved question.\",\n", + " ),\n", + " ]\n", + ")\n", + "\n", + "question_rewriter = re_write_prompt | llm | StrOutputParser()\n", + "question_rewriter.invoke({\"question\": question})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb9bd597-a683-433a-879d-7991cfed5cf7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from typing import List\n", + "\n", + "from typing_extensions import TypedDict\n", + "\n", + "\n", + "class GraphState(TypedDict):\n", + " \"\"\"\n", + " Represents the state of our graph.\n", + "\n", + " Attributes:\n", + " question: question\n", + " generation: LLM generation\n", + " documents: list of documents\n", + " \"\"\"\n", + "\n", + " question: str\n", + " generation: str\n", + " documents: List[str]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "398e5cc2-9c41-465d-a4f4-93c82ed452f2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def retrieve(state):\n", + " \"\"\"\n", + " Retrieve documents\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): New key added to state, documents, that contains retrieved documents\n", + " \"\"\"\n", + " print(\"---RETRIEVE---\")\n", + " question = state[\"question\"]\n", + "\n", + " # Retrieval\n", + " documents = retriever.get_relevant_documents(question)\n", + " return {\"documents\": documents, \"question\": question}\n", + "\n", + "\n", + "def generate(state):\n", + " \"\"\"\n", + " Generate answer\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): New key added to state, generation, that contains LLM generation\n", + " \"\"\"\n", + " print(\"---GENERATE---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + "\n", + " # RAG generation\n", + " generation = rag_chain.invoke({\"context\": documents, \"question\": question})\n", + " return {\"documents\": documents, \"question\": question, \"generation\": generation}\n", + "\n", + "\n", + "def grade_documents(state):\n", + " \"\"\"\n", + " Determines whether the retrieved documents are relevant to the question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): Updates documents key with only filtered relevant documents\n", + " \"\"\"\n", + "\n", + " print(\"---CHECK DOCUMENT RELEVANCE TO QUESTION---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + "\n", + " # Score each doc\n", + " filtered_docs = []\n", + " for d in documents:\n", + " score = retrieval_grader.invoke(\n", + " {\"question\": question, \"document\": d.page_content}\n", + " )\n", + " grade = score.binary_score\n", + " if grade == \"yes\":\n", + " print(\"---GRADE: DOCUMENT RELEVANT---\")\n", + " filtered_docs.append(d)\n", + " else:\n", + " print(\"---GRADE: DOCUMENT NOT RELEVANT---\")\n", + " continue\n", + " return {\"documents\": filtered_docs, \"question\": question}\n", + "\n", + "\n", + "def transform_query(state):\n", + " \"\"\"\n", + " Transform the query to produce a better question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " state (dict): Updates question key with a re-phrased question\n", + " \"\"\"\n", + "\n", + " print(\"---TRANSFORM QUERY---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + "\n", + " # Re-write question\n", + " better_question = question_rewriter.invoke({\"question\": question})\n", + " return {\"documents\": documents, \"question\": better_question}\n", + "\n", + "\n", + "### Edges\n", + "\n", + "\n", + "def decide_to_generate(state):\n", + " \"\"\"\n", + " Determines whether to generate an answer, or re-generate a question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " str: Binary decision for next node to call\n", + " \"\"\"\n", + "\n", + " print(\"---ASSESS GRADED DOCUMENTS---\")\n", + " state[\"question\"]\n", + " filtered_documents = state[\"documents\"]\n", + "\n", + " if not filtered_documents:\n", + " # All documents have been filtered check_relevance\n", + " # We will re-generate a new query\n", + " print(\n", + " \"---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---\"\n", + " )\n", + " return \"transform_query\"\n", + " else:\n", + " # We have relevant documents, so generate answer\n", + " print(\"---DECISION: GENERATE---\")\n", + " return \"generate\"\n", + "\n", + "\n", + "def grade_generation_v_documents_and_question(state):\n", + " \"\"\"\n", + " Determines whether the generation is grounded in the document and answers question.\n", + "\n", + " Args:\n", + " state (dict): The current graph state\n", + "\n", + " Returns:\n", + " str: Decision for next node to call\n", + " \"\"\"\n", + "\n", + " print(\"---CHECK HALLUCINATIONS---\")\n", + " question = state[\"question\"]\n", + " documents = state[\"documents\"]\n", + " generation = state[\"generation\"]\n", + "\n", + " score = hallucination_grader.invoke(\n", + " {\"documents\": documents, \"generation\": generation}\n", + " )\n", + " grade = score.binary_score\n", + "\n", + " # Check hallucination\n", + " if grade == \"yes\":\n", + " print(\"---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---\")\n", + " # Check question-answering\n", + " print(\"---GRADE GENERATION vs QUESTION---\")\n", + " score = answer_grader.invoke({\"question\": question, \"generation\": generation})\n", + " grade = score.binary_score\n", + " if grade == \"yes\":\n", + " print(\"---DECISION: GENERATION ADDRESSES QUESTION---\")\n", + " return \"useful\"\n", + " else:\n", + " print(\"---DECISION: GENERATION DOES NOT ADDRESS QUESTION---\")\n", + " return \"not useful\"\n", + " else:\n", + " pprint(\"---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---\")\n", + " return \"not supported\"" + ] + }, + { + "cell_type": "markdown", + "id": "b77061d5-fa7f-4dbc-ad5b-cc409b931bf4", + "metadata": {}, + "source": [ + "### Compose the graph" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5ae35f7-ff5c-40f1-9b0e-1a8bac0dc58f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from langgraph.graph import END, StateGraph, START\n", + "\n", + "workflow = StateGraph(GraphState)\n", + "\n", + "# Define the nodes\n", + "workflow.add_node(\"retrieve\", retrieve) # retrieve\n", + "workflow.add_node(\"grade_documents\", grade_documents) # grade documents\n", + "workflow.add_node(\"generate\", generate) # generatae\n", + "workflow.add_node(\"transform_query\", transform_query) # transform_query\n", + "\n", + "# Build graph\n", + "workflow.add_edge(START, \"retrieve\")\n", + "workflow.add_edge(\"retrieve\", \"grade_documents\")\n", + "workflow.add_conditional_edges(\n", + " \"grade_documents\",\n", + " decide_to_generate,\n", + " {\n", + " \"transform_query\": \"transform_query\",\n", + " \"generate\": \"generate\",\n", + " },\n", + ")\n", + "workflow.add_edge(\"transform_query\", \"retrieve\")\n", + "workflow.add_conditional_edges(\n", + " \"generate\",\n", + " grade_generation_v_documents_and_question,\n", + " {\n", + " \"not supported\": \"generate\",\n", + " \"useful\": END,\n", + " \"not useful\": \"transform_query\",\n", + " },\n", + ")\n", + "\n", + "# Compile\n", + "app = workflow.compile()" + ] + }, + { + "cell_type": "markdown", + "id": "074bd11c-0252-46cc-8903-5fa29c327e96", + "metadata": {}, + "source": [ + "# Run with a sample question\n", + "\n", + "Test the `/agent` endpoint by sending a sample question. The server runs the entire RAG pipeline and returns the answer." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4676f4c8-26bd-4f36-aadf-80771b001b59", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "inputs = {\n", + " \"question\": \"How does self-RAG work? Explain the steps involved in the implementation.\"\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b84144d8-e266-4a13-a515-a55fcdc02f11", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "result = app.invoke(inputs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a5689309-1780-4725-8d3f-364841303def", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "result[\"generation\"]" + ] + }, + { + "cell_type": "markdown", + "id": "94575479-3639-46de-83d6-6633fc22c00d", + "metadata": {}, + "source": [ + "> This notebook was inspired by this [LangGraph cookbook](https://github.com/langchain-ai/langgraph/blob/e3ca7bb3e9d34b09633852f4d08d55f6dcd4364b/examples/rag/langgraph_self_rag.ipynb)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}