From 597455a7fa3bd558edc95790eacae898026c0e66 Mon Sep 17 00:00:00 2001 From: lspataroG <167472995+lspataroG@users.noreply.github.com> Date: Tue, 11 Feb 2025 17:38:41 +0100 Subject: [PATCH] feat: Add notebook for bigframes RAG (#1677) # Description This notebook showcases the power of BigFrames for building production-ready RAG pipelines on Google Cloud. We leveraged BigFrames' seamless integration with BigQuery and Vertex AI to efficiently process and embed large text datasets. --- .github/CODEOWNERS | 1 + .github/actions/spelling/allow.txt | 9 + .../scalable_rag_with_bigframes.ipynb | 1106 +++++++++++++++++ 3 files changed, 1116 insertions(+) create mode 100644 gemini/use-cases/retrieval-augmented-generation/scalable_rag_with_bigframes.ipynb diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 85aa93b0754..bc93bb552a5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -71,6 +71,7 @@ /generative-ai/genkit/postcard-generator @mattsday @GoogleCloudPlatform/generative-ai-devrel /generative-ai/gemini/evaluation/evaluate_langchain_chains.ipynb @eliasecchig @GoogleCloudPlatform/generative-ai-devrel /generative-ai/gemini/use-cases/retrieval-augmented-generation/rag_qna_with_bq_and_featurestore.ipynb @eliasecchig @lspatarog @GoogleCloudPlatform/generative-ai-devrel +/generative-ai/gemini/use-cases/retrieval-augmented-generation/rag_pipeline_terabyte_scale_with_bigframes.ipynb @lspatarog @eliasecchig /generative-ai/gemini/tuning/gemini_supervised_finetuning_text_classification.ipynb @gabrielahrlr @eliasecchig @GoogleCloudPlatform/generative-ai-devrel /generative-ai/open-models/serving @polong-lin @GoogleCloudPlatform/generative-ai-devrel /generative-ai/open-models/use-cases/cloud_run_ollama_gemma2_rag_qa.ipynb @eliasecchig @GoogleCloudPlatform/generative-ai-devrel diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 4c7e1a3fd72..fd7d5a7abf5 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -76,6 +76,7 @@ DRV DVDs DWMWA Daniil +Dedup DeepEval Depatmint Dexin @@ -408,6 +409,8 @@ Takeaways Tbk Tbl Tencent +Terabyte +Terabytes Testables Tetsuo Tianli @@ -521,6 +524,7 @@ barmode barpolar baxis bbc +bbq beir bff bgswap @@ -603,6 +607,7 @@ dbln dcg ddbb ddl +dedup deepeval deepseek demouser @@ -1004,6 +1009,7 @@ quadrotor qubit qubits quippy +rag ragas ragdemos rahhhrr @@ -1094,6 +1100,8 @@ takeaways tastetest tavily tencel +terabyte +terabytes termcolor terraform texting @@ -1126,6 +1134,7 @@ tts tures typehints ubuntu +udf uids ultrawide und diff --git a/gemini/use-cases/retrieval-augmented-generation/scalable_rag_with_bigframes.ipynb b/gemini/use-cases/retrieval-augmented-generation/scalable_rag_with_bigframes.ipynb new file mode 100644 index 00000000000..64c8a6fdcaf --- /dev/null +++ b/gemini/use-cases/retrieval-augmented-generation/scalable_rag_with_bigframes.ipynb @@ -0,0 +1,1106 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "id": "ur8xi4C7S06n" + }, + "outputs": [], + "source": [ + "# Copyright 2025 Google LLC\n", + "#\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# https://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions and\n", + "# limitations under the License." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "JAPoU8Sm5E6e" + }, + "source": [ + "# Production & Scalable RAG Pipeline Using BigFrames\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " \n", + " \"Google
Open in Colab\n", + "
\n", + "
\n", + " \n", + " \"Google
Open in Colab Enterprise\n", + "
\n", + "
\n", + " \n", + " \"Vertex
Open in Vertex AI Workbench\n", + "
\n", + "
\n", + " \n", + " \"BigQuery
Open in BigQuery Studio\n", + "
\n", + "
\n", + " \n", + " \"GitHub
View on GitHub\n", + "
\n", + "
\n", + "\n", + "
\n", + "\n", + "\n", + "Share to:\n", + "\n", + "\n", + " \"LinkedIn\n", + "\n", + "\n", + "\n", + " \"Bluesky\n", + "\n", + "\n", + "\n", + " \"X\n", + "\n", + "\n", + "\n", + " \"Reddit\n", + "\n", + "\n", + "\n", + " \"Facebook\n", + "" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "84f0f73a0f76" + }, + "source": [ + "| | |\n", + "|-|-|\n", + "| Authors | [Lorenzo Spataro](https://github.com/lspataroG), [Elia Secchi](https://github.com/eliasecchig) |\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "tvgnzT1CKxrO" + }, + "source": [ + "# Overview\n", + "\n", + "This notebook demonstrates how to use [BigFrames](https://cloud.google.com/python/docs/reference/bigframes/latest) and [LangChain](http://python.langchain.com/docs) to build a RAG (Retrieval Augmented Generation) pipeline using Vertex AI.\n", + "\n", + "Specifically, we are going to build a data pipeline capable of being deployed in a production environment with scheduled execution.\n", + "\n", + "You will learn how to:\n", + "- Load data from BigQuery into BigFrames\n", + "- Create embeddings using Vertex AI models\n", + "- Build a vector store using BigQuery\n", + "- Create a RAG pipeline using LangChain\n", + "- Query your data using natural language\n", + "\n", + "## What is BigQuery DataFrames?\n", + "BigQuery DataFrames, also called [BigFrames](https://cloud.google.com/python/docs/reference/bigframes/latest), lets you process data in BigQuery using familiar Python APIs like pandas and scikit-learn. It works by converting Python code into optimized SQL that runs directly in BigQuery.\n", + "Key benefits:\n", + "- Process terabytes of data using Python without moving it out of BigQuery\n", + "- Train ML models directly in BigQuery using scikit-learn syntax\n", + "- A wide range of popular pandas and scikit-learn APIs are available through SQL conversion\n", + "- Lazy evaluation for better performance\n", + "- Custom Python functions run as BigQuery remote functions\n", + "- Vertex AI integration for Gemini model access\n", + "\n", + "The following diagram describes the workflow of BigQuery DataFrames:\n", + "![BigQuery DataFrames Workflow](https://cloud.google.com/static/bigquery/images/dataframes-workflow.png)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "61RBz8LLbxCR" + }, + "source": [ + "# Get started" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "No17Cw5hgx12" + }, + "source": [ + "## Install Vertex AI SDK and other required packages\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "tFy3H3aPgx12" + }, + "outputs": [], + "source": [ + "%pip install --upgrade --user --quiet google-cloud-aiplatform \"bigframes\" langchain markdownify swifter \"langchain-google-community[featurestore]\" langchain-google-vertexai\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "R5Xep4W9lq-Z" + }, + "source": [ + "## Restart runtime\n", + "\n", + "To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.\n", + "\n", + "The restart might take a minute or longer. After it's restarted, continue to the next step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "XRvKdaPDTznN" + }, + "outputs": [], + "source": [ + "import IPython\n", + "\n", + "app = IPython.Application.instance()\n", + "app.kernel.do_shutdown(True)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "SbmM4z7FOBpM" + }, + "source": [ + "
\n", + "⚠️ The kernel is going to restart. In Colab or Colab Enterprise, you might see an error message that says \"Your session crashed for an unknown reason.\" This is expected. Wait until it's finished before continuing to the next step. ⚠️\n", + "
\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "dmWOrTJ3gx13" + }, + "source": [ + "## Authenticate your notebook environment (Colab only)\n", + "\n", + "If you're running this notebook on Google Colab, run the cell below to authenticate your environment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NyKGtVQjgx13" + }, + "outputs": [], + "source": [ + "import sys\n", + "\n", + "if \"google.colab\" in sys.modules:\n", + " from google.colab import auth\n", + "\n", + " auth.authenticate_user()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "DF4l8DTdWgPY" + }, + "source": [ + "## Set Google Cloud project information and initialize Vertex AI SDK\n", + "\n", + "To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).\n", + "\n", + "Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).\n", + "\n", + "Alternatively you can also enable the Vertex AI API by uncommenting and running the following command:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "id": "Nqwi-5ufWp_B" + }, + "outputs": [], + "source": [ + "# Use the environment variable if the user doesn't provide Project ID.\n", + "import datetime\n", + "import os\n", + "\n", + "import bigframes.ml.llm as llm\n", + "import bigframes.pandas as bpd\n", + "from google.cloud import bigquery\n", + "import vertexai\n", + "\n", + "PROJECT_ID = (\n", + " \"your-project-id\" # @param {type: \"string\", placeholder: \"your-project-id\"}\n", + ")\n", + "if not PROJECT_ID or PROJECT_ID == \"your-project-id\":\n", + " PROJECT_ID = str(os.environ.get(\"GOOGLE_CLOUD_PROJECT\"))\n", + "\n", + "# GOOGLE_CLOUD_REGION must be in a US region because the source dataset is in US\n", + "LOCATION = os.environ.get(\"GOOGLE_CLOUD_REGION\", \"us-central1\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "ee9b59177653" + }, + "outputs": [], + "source": [ + "# Set the Google Cloud project and enable the Vertex AI API\n", + "! gcloud config set project $PROJECT_ID && gcloud services enable aiplatform.googleapis.com" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "id": "aae998acc800" + }, + "outputs": [], + "source": [ + "# Set project and location for Vertex, BigQuery and BigFrames\n", + "vertexai.init(project=PROJECT_ID, location=LOCATION)\n", + "\n", + "bq_client = bigquery.Client(project=PROJECT_ID, location=\"US\")\n", + "bpd.options.bigquery.project = PROJECT_ID\n", + "bpd.options.bigquery.location = \"US\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "5303c05f7aa6" + }, + "source": [ + "### Import libraries" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "id": "Myuk3g_Jrvob" + }, + "outputs": [], + "source": [ + "from datetime import datetime, timedelta\n", + "\n", + "# Standard library imports\n", + "import json\n", + "\n", + "# Third-party imports\n", + "from langchain.text_splitter import RecursiveCharacterTextSplitter\n", + "from langchain_google_community import BigQueryVectorStore\n", + "from langchain_google_vertexai import VertexAIEmbeddings\n", + "from markdownify import markdownify" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8QNXi4qKrvob" + }, + "source": [ + "### Variables definition" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "sx97BvG-qn1E" + }, + "source": [ + "As we're building a pipeline intended to run regularly on a schedule, we're going to set up some time-dependent variables:\n", + "\n", + "* `RUN_DATE`: The date the process runs\n", + "* `IS_INCREMENTAL`: If `True`, only query recent data; otherwise, query the whole dataset\n", + "* `LOOK_BACK_DAYS`: If `IS_INCREMENTAL=True`, defines how many days in the past to query\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "id": "cB7Zf9WCrvob" + }, + "outputs": [], + "source": [ + "IS_INCREMENTAL = True # Flag to enable incremental processing\n", + "RUN_DATE = datetime.strptime(\n", + " \"2022-09-26\", \"%Y-%m-%d\"\n", + ").date() # Set as the last date of the dataset for demonstration purposes (ie. there is no data after that)\n", + "LOOK_BACK_DAYS = 1 # Number of days to look back from RUN_DATE\n", + "START_DATE = str(\n", + " RUN_DATE - timedelta(days=LOOK_BACK_DAYS)\n", + ") # Start date for data processing window\n", + "END_DATE = str(RUN_DATE) # End date for data processing window" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "eZRyBMJprvob" + }, + "source": [ + "# 1. Data Loading and initial preprocessing\n", + "This section retrieves and examines Stack Overflow Python Q&A data from the public BigQuery table `production-ai-template.stackoverflow_qa.stackoverflow_python_questions_and_answers`. The data comes from the official [Stack Overflow public dataset](https://console.cloud.google.com/marketplace/product/stack-exchange/stack-overflow) and contains a sample of Python-related questions and their corresponding answers.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "70vCCTMmrvob" + }, + "outputs": [], + "source": [ + "query = f\"\"\"\n", + " SELECT\n", + " creation_date,\n", + " last_edit_date,\n", + " question_id,\n", + " question_title,\n", + " question_body AS question_text,\n", + " answers\n", + " FROM `production-ai-template.stackoverflow_qa.stackoverflow_python_questions_and_answers`\n", + " WHERE TRUE\n", + " # If IS_INCREMENTAL is True, filter records between START_DATE and END_DATE\n", + " # Otherwise, include all records without date filtering\n", + " {f'AND TIMESTAMP_TRUNC(creation_date, DAY) BETWEEN TIMESTAMP(\"{START_DATE}\") AND TIMESTAMP(\"{END_DATE}\")' if IS_INCREMENTAL else ''}\n", + "\"\"\"\n", + "df = bpd.read_gbq(query)\n", + "df.head(2)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "9xpXwWB7rvoc" + }, + "source": [ + "## Data Cleaning and Markdown Conversion\n", + "In this step, we clean the raw data by converting HTML content to Markdown format for better readability and processing.\n", + "We transform both questions and answers from HTML to Markdown, structure the content with proper headings,\n", + "and combine them into a unified text format that will be easier to work with in subsequent steps.\n", + "\n", + "> Note: In this case, we are leveraging BigFrames' capability to pull data into memory, where the processing happens. This allows us to efficiently transform and clean the data using pandas-like operations. Later, we will demonstrate how to scale this data processing using Remote Functions." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "id": "unjauyZUrvoc" + }, + "outputs": [], + "source": [ + "def convert_html_to_markdown(html: str) -> str:\n", + " \"\"\"Convert HTML into Markdown for easier parsing and rendering after LLM response.\"\"\"\n", + " return markdownify(html).strip()\n", + "\n", + "\n", + "def create_answers_markdown(answers: list[dict]) -> str:\n", + " \"\"\"Convert each answer's HTML to markdown and concatenate into a single markdown text.\"\"\"\n", + " answers_md = \"\"\n", + " for index, answer_record in enumerate(answers):\n", + " answers_md += (\n", + " f\"\\n\\n## Answer {index + 1}:\\n\" # Answer number is H2 heading size\n", + " )\n", + " answers_md += convert_html_to_markdown(answer_record[\"body\"])\n", + " return answers_md" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "sHuxypiPrvoc" + }, + "outputs": [], + "source": [ + "# Sort, deduplicate and reset index in one operation\n", + "df = (\n", + " df.sort_values(\"last_edit_date\", ascending=False)\n", + " .drop_duplicates(\"question_id\")\n", + " .reset_index(drop=True)\n", + ")\n", + "\n", + "# Create markdown fields efficiently\n", + "df[\"question_title_md\"] = \"# \" + df[\"question_title\"] + \"\\n\" # Title is H1 heading size\n", + "df[\"question_text_md\"] = (\n", + " df[\"question_text\"].to_pandas().apply(convert_html_to_markdown) + \"\\n\"\n", + ")\n", + "df[\"answers_md\"] = df[\"answers\"].to_pandas().apply(create_answers_markdown)\n", + "\n", + "# Create a column containing the whole markdown text\n", + "df[\"full_text_md\"] = df[\"question_title_md\"] + df[\"question_text_md\"] + df[\"answers_md\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "f06xaGIRrvoc" + }, + "outputs": [], + "source": [ + "# Select final columns for the cleaned dataset\n", + "final_cols = [\"last_edit_date\", \"question_id\", \"question_text\", \"full_text_md\"]\n", + "df = df[final_cols]\n", + "df.head(2)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Ilb9nQzLrvoc" + }, + "source": [ + "# 2. Text Chunking\n", + "The text data is in Markdown format, requiring a thoughtful chunking approach. While we currently use a basic character-based splitter, production systems typically employ more sophisticated techniques:\n", + "\n", + "- Preserve semantic units like paragraphs and sections\n", + "- Maintain markdown structure and hierarchy\n", + "- Keep related content together (e.g., questions with their answers)\n", + "- Use overlapping chunks to maintain context across boundaries\n", + "- Consider special markdown elements like code blocks and lists\n", + "\n", + "This helps ensure chunks remain coherent and meaningful for downstream tasks.\n", + "\n", + "For simplicity, we will continue using a basic character-based splitter. The primary focus of this notebook, in fact, is demonstrating scalable Gen AI data processing." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "id": "3Z0mITgYrvoc" + }, + "outputs": [], + "source": [ + "text_splitter = RecursiveCharacterTextSplitter(\n", + " chunk_size=1500,\n", + " chunk_overlap=20,\n", + " length_function=len,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "lslvpk7Qrvoc" + }, + "source": [ + "Apply text chunking to each document locally using pandas and swifter\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xEUabTuTrvoc" + }, + "outputs": [], + "source": [ + "df[\"text_chunk\"] = (\n", + " df[\"full_text_md\"]\n", + " .to_pandas()\n", + " .astype(object)\n", + " .swifter.apply(text_splitter.split_text)\n", + ")\n", + "df.head(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "P7aiNG-cFC8y" + }, + "outputs": [], + "source": [ + "# Compute the sequential index of a chunk within the list of chunks for each question\n", + "chunk_ids = [\n", + " str(idx) for text_chunk in df[\"text_chunk\"] for idx in range(len(text_chunk))\n", + "]\n", + "# Explode the chunk list so that we get a row per chunk\n", + "df = df.explode(\"text_chunk\").reset_index(drop=True)\n", + "# Assigning the chunk_id as question_id + sequential index of the chunk\n", + "df[\"chunk_id\"] = df[\"question_id\"].astype(\"string\") + \"__\" + chunk_ids" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "Q046iLWsrvod" + }, + "outputs": [], + "source": [ + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "AeOhgIdzrvod" + }, + "source": [ + "# 3. Embedding\n", + "\n", + "To generate embeddings, we leverage the seamless integration between BigFrames, BigQuery, and Vertex AI.\n", + "This integration allows us to efficiently generate embeddings through Vertex AI's batch scoring process.\n", + "The `text-embedding-005` model converts each text chunk into a high-dimensional vector representation,\n", + "enabling semantic search and similarity analysis.\n", + "\n", + "> Note: This step might take a few minutes to complete.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "a7UZ2aXervod" + }, + "outputs": [], + "source": [ + "# Initialize the embedding model\n", + "embedder = llm.TextEmbeddingGenerator(model_name=\"text-embedding-005\")\n", + "\n", + "# Generate embeddings\n", + "embeddings_df = embedder.predict(df[\"text_chunk\"])\n", + "df = df.assign(\n", + " embedding_result=embeddings_df[\"ml_generate_embedding_result\"],\n", + " embedding_statistics=embeddings_df[\"ml_generate_embedding_statistics\"],\n", + " embedding_status=embeddings_df[\"ml_generate_embedding_status\"],\n", + ")\n", + "current_timestamp = datetime.now()\n", + "df[\"creation_timestamp\"] = current_timestamp\n", + "\n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "BhGB484Nrvod" + }, + "source": [ + "We can now notice 4 new columns added to our DataFrame!" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "o7NwgaQmrvod" + }, + "source": [ + "# 4. Saving results\n", + "\n", + "We are now ready to save the results of the processing to a BigQuery table, for consumption by the different Vector DBs we might want to use.\n", + "The incremental writing strategy allows us to efficiently update our embeddings table by:\n", + "1. Only processing new/modified questions since last run (controlled by `IS_INCREMENTAL` flag)\n", + "2. Appending new embeddings to the existing table when `IS_INCREMENTAL=True`\n", + "3. Replacing the entire table when `IS_INCREMENTAL=False`\n", + "\n", + "Since we may end up with duplicate entries when doing incremental updates\n", + "(e.g. if a question was modified multiple times), we'll need to deduplicate\n", + "the table afterwards to keep only the latest version of each question.\n", + "The deduplication will be done based on question_id, keeping the row with the most recent `creation_timestamp`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": { + "id": "tDZa-KlYrvod" + }, + "outputs": [], + "source": [ + "DESTINATION_DATASET_ID = \"stackoverflow_data\"\n", + "DESTINATION_TABLE_ID = \"incremental_questions_embeddings\"\n", + "PARTITION_DATE_COLUMN = \"creation_timestamp\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8rX7Ckehrvod" + }, + "source": [ + "If it doesn't exist, let's create an empty table with partitioning and the right schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xdWlDzgAFC8y" + }, + "outputs": [], + "source": [ + "def create_table_if_not_exist(\n", + " df, project_id, dataset_id, table_id, partition_column, location=\"US\"\n", + "):\n", + " table_schema = bq_client.get_table(df.head(0).to_gbq()).schema\n", + "\n", + " # Create table schema with partitioning\n", + " table = bigquery.Table(f\"{project_id}.{dataset_id}.{table_id}\", schema=table_schema)\n", + " table.time_partitioning = bigquery.TimePartitioning(\n", + " type_=bigquery.TimePartitioningType.DAY, field=partition_column\n", + " )\n", + "\n", + " dataset = bigquery.Dataset(f\"{project_id}.{dataset_id}\")\n", + " dataset.location = location\n", + " bq_client.create_dataset(dataset, exists_ok=True)\n", + " table = bq_client.create_table(table=table, exists_ok=True)\n", + "\n", + "\n", + "create_table_if_not_exist(\n", + " df=df,\n", + " project_id=PROJECT_ID,\n", + " dataset_id=DESTINATION_DATASET_ID,\n", + " table_id=DESTINATION_TABLE_ID,\n", + " partition_column=PARTITION_DATE_COLUMN,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xAyS5F6trvod" + }, + "outputs": [], + "source": [ + "# If IS_INCREMENTAL is True, append new data to existing table\n", + "# If IS_INCREMENTAL is False, replace entire table with new data\n", + "if_exists_mode = \"append\" if IS_INCREMENTAL else \"replace\"\n", + "\n", + "incremental_table_id = df.to_gbq(\n", + " destination_table=f\"{DESTINATION_DATASET_ID}.{DESTINATION_TABLE_ID}\",\n", + " if_exists=if_exists_mode,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "cIOzoRNHrvoe" + }, + "source": [ + "## Create a new Dedup table (Optional)\n", + "\n", + "If necessary, we can create a deduplication table to address duplicate questions that may appear in the dataset across different dates." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "3_6VklCMFC83" + }, + "outputs": [], + "source": [ + "df_questions = bpd.read_gbq(\n", + " f\"{DESTINATION_DATASET_ID}.{DESTINATION_TABLE_ID}\", use_cache=False\n", + ")\n", + "max_date_df = (\n", + " df_questions.groupby(\"question_id\")[\"creation_timestamp\"].max().reset_index()\n", + ")\n", + "df_questions_dedup = max_date_df.merge(\n", + " df_questions, how=\"inner\", on=[\"question_id\", \"creation_timestamp\"]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "gazEjsfVFC83" + }, + "outputs": [], + "source": [ + "DESTINATION_DEDUPED_QUESTIONS_TABLE_ID = \"questions_embeddings\"\n", + "create_table_if_not_exist(\n", + " df=df_questions_dedup,\n", + " project_id=PROJECT_ID,\n", + " dataset_id=DESTINATION_DATASET_ID,\n", + " table_id=DESTINATION_DEDUPED_QUESTIONS_TABLE_ID,\n", + " partition_column=PARTITION_DATE_COLUMN,\n", + ")\n", + "\n", + "deduped_table_id = df_questions_dedup.to_gbq(\n", + " destination_table=f\"{DESTINATION_DATASET_ID}.{DESTINATION_DEDUPED_QUESTIONS_TABLE_ID}\",\n", + " if_exists=\"replace\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "3CwdVwyQrvoj" + }, + "source": [ + "# 5. Testing retrieval" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "90d9eda8a0bd" + }, + "source": [ + "Let's try to find similar documents based on an input query" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "2VQiUuGkrvoj" + }, + "outputs": [], + "source": [ + "embedding_model = VertexAIEmbeddings(\n", + " model_name=\"text-embedding-005\", project=PROJECT_ID\n", + ")\n", + "bq_store = BigQueryVectorStore(\n", + " project_id=PROJECT_ID,\n", + " location=\"US\",\n", + " dataset_name=DESTINATION_DATASET_ID,\n", + " table_name=DESTINATION_DEDUPED_QUESTIONS_TABLE_ID,\n", + " embedding=embedding_model,\n", + " embedding_field=\"embedding_result\",\n", + " content_field=\"text_chunk\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "U0fvQQwKrvoj" + }, + "outputs": [], + "source": [ + "# Perform similarity search and look at the most relevant documents\n", + "search_query = \"how do I read a csv file with python?\" # @param {type:\"string\"}\n", + "results = bq_store.similarity_search(search_query)\n", + "text_results = [x.page_content for x in results]\n", + "text_results" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8e5b2959f009" + }, + "source": [ + "# 6. Answer Generation\n", + "\n", + "Now we can put everything together and use an LLM to answer a question based on Stack Overflow data!\n", + "\n", + "We are going to use LangChain and the [`RetrievalQA` chain](https://python.langchain.com/docs/versions/migrating_chains/retrieval_qa/) to build a very simple RAG chain.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "45ebc76dd07c" + }, + "outputs": [], + "source": [ + "from langchain import hub\n", + "from langchain.chains import create_retrieval_chain\n", + "from langchain.chains.combine_documents import create_stuff_documents_chain\n", + "from langchain_google_vertexai import ChatVertexAI\n", + "\n", + "# Convert the BigQuery VectorStore to a LangChain retriever\n", + "langchain_retriever = bq_store.as_retriever()\n", + "\n", + "# Init the VertexAI LLM\n", + "llm = ChatVertexAI(model_name=\"gemini-1.5-flash\")\n", + "\n", + "# See full prompt at https://smith.langchain.com/hub/langchain-ai/retrieval-qa-chat\n", + "retrieval_qa_chat_prompt = hub.pull(\"langchain-ai/retrieval-qa-chat\")\n", + "\n", + "combine_docs_chain = create_stuff_documents_chain(llm, retrieval_qa_chat_prompt)\n", + "rag_chain = create_retrieval_chain(langchain_retriever, combine_docs_chain)\n", + "\n", + "print(rag_chain.invoke({\"input\": search_query})[\"answer\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "L2o86ME-rvoj" + }, + "source": [ + "# 7. Scaling data processing to Terabytes: BigFrame Remote Functions" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "unwXv4-Urvoj" + }, + "source": [ + "Sometimes data is too large to run local process when running custom Python functions.\n", + "In fact, every time we convert a series or a DataFrame to pandas using `to_pandas()`, the data is loaded into memory.\n", + "\n", + "To be able to run large datasets processes remotely, we can define remote UDF functions. Let's see an example of a BigFrames [remote function](https://cloud.google.com/bigquery/docs/samples/bigquery-dataframes-remote-function)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "6ap_IicZFC84" + }, + "outputs": [], + "source": [ + "import json\n", + "\n", + "import bigframes.bigquery as bbq\n", + "import bigframes.pandas\n", + "from langchain.text_splitter import RecursiveCharacterTextSplitter\n", + "\n", + "text_splitter = RecursiveCharacterTextSplitter(\n", + " chunk_size=100,\n", + " chunk_overlap=10,\n", + " length_function=len,\n", + ")\n", + "\n", + "# Create UDF for chunking\n", + "# Behind the scenes, BigFrames will automatically create a connection for you but you can also create a dedicated connection.\n", + "# See here: https://cloud.google.com/bigquery/docs/remote-functions#create_a_connection.\n", + "\n", + "\n", + "@bigframes.pandas.remote_function(packages=[\"langchain\"], reuse=True)\n", + "def chunk_text_udf(text: str) -> str:\n", + " return json.dumps(\n", + " [chunk.page_content for chunk in text_splitter.create_documents([text])]\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "4a96c961f473" + }, + "source": [ + "We are going to read the final table we saved earlier to show how to perform chunking using a Python remote function.\n", + "\n", + "The rest of the pandas custom functions could also be implemented in a similar way." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "192ed33347fb" + }, + "outputs": [], + "source": [ + "# Reading the table we saved earlier for demonstration purposes\n", + "final_cols = [\"last_edit_date\", \"question_id\", \"question_text\", \"full_text_md\"]\n", + "\n", + "df_udf = bpd.read_gbq(\n", + " f\"{DESTINATION_DATASET_ID}.{DESTINATION_DEDUPED_QUESTIONS_TABLE_ID}\",\n", + " use_cache=False,\n", + ")[final_cols]\n", + "\n", + "# Sort, deduplicate and reset index in one operation\n", + "df_udf = (\n", + " df_udf.sort_values(\"last_edit_date\", ascending=False)\n", + " .drop_duplicates(\"question_id\")\n", + " .reset_index(drop=True)\n", + ")\n", + "df_udf.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "aWTS6Oz5FC84" + }, + "source": [ + "We are using the UDF to chunk the data and return a list of text chunks.\n", + "\n", + "Since BigFrames UDFs expect simple types as input and output, we are going to convert the list of chunks to a json string inside the UDF." + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": { + "id": "dc6cd9933cfd" + }, + "outputs": [], + "source": [ + "df_udf[\"full_text_chunk\"] = df_udf[\"full_text_md\"].apply(chunk_text_udf)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "I0McPCZQFC84" + }, + "source": [ + "As we can see, the data type is now string." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "VPSE7IR-rvok" + }, + "outputs": [], + "source": [ + "first_row_text = df_udf[\"full_text_chunk\"].iloc[0]\n", + "print(type(first_row_text))\n", + "first_row_text" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Rusk8ojbtEl0" + }, + "source": [ + "To fix that, we can now use the BigFrames BQ `json_extract_string_array` method to convert the json string back to a list of string." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "npMj9jbhrvok" + }, + "outputs": [], + "source": [ + "df_udf[\"full_text_chunk\"] = bbq.json_extract_string_array(df_udf[\"full_text_chunk\"])\n", + "\n", + "first_row_text = df_udf[\"full_text_chunk\"].iloc[0]\n", + "print(type(first_row_text))\n", + "first_row_text" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "v2E5pSvtrvok" + }, + "source": [ + "Here is how the DataFrame looks like after chunking." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "2af3eef0d5a0" + }, + "outputs": [], + "source": [ + "df_udf.head(2)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2a4e033321ad" + }, + "source": [ + "# 8. Cleaning up\n", + "\n", + "Run this cell to clean up the resources created in this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "3I3SqYIuFC84" + }, + "outputs": [], + "source": [ + "from google.cloud import bigquery_connection\n", + "\n", + "# Remove BigQuery dataset and tables\n", + "dataset = f\"{PROJECT_ID}.{DESTINATION_DATASET_ID}\"\n", + "dataset_object = bigquery.Dataset(dataset)\n", + "bq_client.delete_dataset(dataset_object, delete_contents=True, not_found_ok=True)\n", + "\n", + "# Remove BigTable remote function\n", + "!gcloud functions delete $chunk_text_udf.bigframes_cloud_function --region=$LOCATION --quiet\n", + "\n", + "# Remove BigQuery external connection\n", + "connection_client = bigquery_connection.ConnectionServiceClient()\n", + "connection_path = connection_client.connection_path(\n", + " project=PROJECT_ID, location=\"us\", connection=\"bigframes-default-connection\"\n", + ")\n", + "connection_client.delete_connection(name=connection_path)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "I_-3K9iMIjLU" + }, + "source": [ + "# Conclusion\n", + "\n", + "This notebook showcased the power of **BigFrames** for building production-ready RAG pipelines on Google Cloud. We leveraged BigFrames' seamless integration with BigQuery and Vertex AI to efficiently process and embed large text datasets.\n", + "\n", + "Key takeaways highlighting BigFrames' capabilities include:\n", + "\n", + "* **Scalable Data Processing:** BigFrames allowed us to manipulate BigQuery data using familiar pandas-like syntax, whether processing in memory or through scalable remote functions for terabyte-scale datasets.\n", + "* **Simplified Embedding Generation:** BigFrames made it easy to generate embeddings with Vertex AI's embedding models directly within our data pipeline.\n", + "* **Efficient Data Management:** We used BigFrames to manage our embeddings in BigQuery, implementing incremental updates and deduplication for optimal performance." + ] + } + ], + "metadata": { + "colab": { + "name": "scalable_rag_with_bigframes.ipynb", + "toc_visible": true + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}