diff --git a/backend/onyx/connectors/confluence/utils.py b/backend/onyx/connectors/confluence/utils.py index 600808453e9..18742438696 100644 --- a/backend/onyx/connectors/confluence/utils.py +++ b/backend/onyx/connectors/confluence/utils.py @@ -182,6 +182,8 @@ def _process_image_attachment( media_type=media_type, file_origin=FileOrigin.CONNECTOR, ) + print(f"Stored image attachment with file name: {file_name}") + logger.info(f"Stored image attachment with file name: {file_name}") # Return empty text but include the file_name for later processing return AttachmentProcessingResult(text="", file_name=file_name, error=None) diff --git a/backend/onyx/connectors/models.py b/backend/onyx/connectors/models.py index 911120001b4..289a2b91c54 100644 --- a/backend/onyx/connectors/models.py +++ b/backend/onyx/connectors/models.py @@ -42,14 +42,14 @@ class TextSection(Section): """Section containing text content""" text: str - image_file_name: None = None + link: str | None = None class ImageSection(Section): """Section containing an image reference""" image_file_name: str - text: None = None + link: str | None = None class BasicExpertInfo(BaseModel): @@ -169,7 +169,9 @@ def get_metadata_str_attributes(self) -> list[str] | None: class Document(DocumentBase): - id: str # This must be unique or during indexing/reindexing, chunks will be overwritten + """Used for Onyx ingestion api, the ID is required""" + + id: str source: DocumentSource def get_total_char_length(self) -> int: @@ -207,6 +209,32 @@ def from_base(cls, base: DocumentBase) -> "Document": ) +class IndexingDocument(Document): + """Document with processed sections for indexing""" + + processed_sections: list[Section] = [] + + def get_total_char_length(self) -> int: + """Get the total character length of the document including processed sections""" + title_len = len(self.title or self.semantic_identifier) + + # Use processed_sections if available, otherwise fall back to original sections + if self.processed_sections: + section_len = sum( + len(section.text) if section.text is not None else 0 + for section in self.processed_sections + ) + else: + section_len = sum( + len(section.text) + if isinstance(section, TextSection) and section.text is not None + else 0 + for section in self.sections + ) + + return title_len + section_len + + class SlimDocument(BaseModel): id: str perm_sync_data: Any | None = None diff --git a/backend/onyx/connectors/vision_enabled_connector.py b/backend/onyx/connectors/vision_enabled_connector.py deleted file mode 100644 index 021fb759b2e..00000000000 --- a/backend/onyx/connectors/vision_enabled_connector.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Mixin for connectors that need vision capabilities. -""" -from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled -from onyx.llm.factory import get_default_llm_with_vision -from onyx.llm.interfaces import LLM -from onyx.utils.logger import setup_logger - -logger = setup_logger() - - -class VisionEnabledConnector: - """ - Mixin for connectors that need vision capabilities. - - This mixin provides a standard way to initialize a vision-capable LLM - for image analysis during indexing. - - Usage: - class MyConnector(LoadConnector, VisionEnabledConnector): - def __init__(self, ...): - super().__init__(...) - self.initialize_vision_llm() - """ - - def initialize_vision_llm(self) -> None: - """ - Initialize a vision-capable LLM if enabled by configuration. - - Sets self.image_analysis_llm to the LLM instance or None if disabled. - """ - self.image_analysis_llm: LLM | None = None - if get_image_extraction_and_analysis_enabled(): - try: - self.image_analysis_llm = get_default_llm_with_vision() - if self.image_analysis_llm is None: - logger.warning( - "No LLM with vision found; image summarization will be disabled" - ) - except Exception as e: - logger.warning( - f"Failed to initialize vision LLM due to an error: {str(e)}. " - "Image summarization will be disabled." - ) - self.image_analysis_llm = None diff --git a/backend/onyx/db/pg_file_store.py b/backend/onyx/db/pg_file_store.py index 5d31358b491..7096d92c5c8 100644 --- a/backend/onyx/db/pg_file_store.py +++ b/backend/onyx/db/pg_file_store.py @@ -67,6 +67,9 @@ def read_lobj( use_tempfile: bool = False, ) -> IO: pg_conn = get_pg_conn_from_session(db_session) + # Ensure we're using binary mode by default for large objects + if mode is None: + mode = "rb" large_object = ( pg_conn.lobject(lobj_oid, mode=mode) if mode else pg_conn.lobject(lobj_oid) ) @@ -81,6 +84,7 @@ def read_lobj( temp_file.seek(0) return temp_file else: + # Ensure we're getting raw bytes without text decoding return BytesIO(large_object.read()) diff --git a/backend/onyx/indexing/chunker.py b/backend/onyx/indexing/chunker.py index 3d190aae858..0dea6fa1259 100644 --- a/backend/onyx/indexing/chunker.py +++ b/backend/onyx/indexing/chunker.py @@ -9,8 +9,8 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import ( get_metadata_keys_to_ignore, ) -from onyx.connectors.models import Document -from onyx.connectors.models import ImageSection +from onyx.connectors.models import IndexingDocument +from onyx.connectors.models import Section from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface from onyx.indexing.models import DocAwareChunk from onyx.natural_language_processing.utils import BaseTokenizer @@ -195,7 +195,7 @@ def _get_mini_chunk_texts(self, chunk_text: str) -> list[str] | None: # ADDED: extra param image_url to store in the chunk def _create_chunk( self, - document: Document, + document: IndexingDocument, chunks_list: list[DocAwareChunk], text: str, links: dict[int, str], @@ -226,31 +226,46 @@ def _create_chunk( def _chunk_document( self, - document: Document, + document: IndexingDocument, title_prefix: str, metadata_suffix_semantic: str, metadata_suffix_keyword: str, content_token_limit: int, ) -> list[DocAwareChunk]: """ - Loops through sections of the document, converting them into one or more chunks. - If a section has an image_link, we treat it as a dedicated chunk. + Legacy method for backward compatibility. + Calls _chunk_document_with_sections with document.sections. """ + return self._chunk_document_with_sections( + document, + document.processed_sections, + title_prefix, + metadata_suffix_semantic, + metadata_suffix_keyword, + content_token_limit, + ) + def _chunk_document_with_sections( + self, + document: IndexingDocument, + sections: list[Section], + title_prefix: str, + metadata_suffix_semantic: str, + metadata_suffix_keyword: str, + content_token_limit: int, + ) -> list[DocAwareChunk]: + """ + Loops through sections of the document, converting them into one or more chunks. + Works with processed sections that are base Section objects. + """ chunks: list[DocAwareChunk] = [] link_offsets: dict[int, str] = {} chunk_text = "" - for section_idx, section in enumerate(document.sections): - # Handle different section types - if isinstance(section, ImageSection): - # Skip ImageSection in chunking - these should be processed separately - continue - - # Get section text - will be None for ImageSection + for section_idx, section in enumerate(sections): + # Get section text and other attributes section_text = clean_text(section.text or "") section_link_text = section.link or "" - # Get image file name if present image_url = section.image_file_name # If there is no useful content, skip @@ -261,7 +276,7 @@ def _chunk_document( ) continue - # CASE 1: If this is an image section, force a separate chunk + # CASE 1: If this section has an image, force a separate chunk if image_url: # First, if we have any partially built text chunk, finalize it if chunk_text.strip(): @@ -278,15 +293,13 @@ def _chunk_document( chunk_text = "" link_offsets = {} - # Create a chunk specifically for this image - # (If the section has text describing the image, use that as content) + # Create a chunk specifically for this image section + # (Using the text summary that was generated during processing) self._create_chunk( document, chunks, section_text, - links={0: section_link_text} - if section_link_text - else {}, # No text offsets needed for images + links={0: section_link_text} if section_link_text else {}, image_file_name=image_url, title_prefix=title_prefix, metadata_suffix_semantic=metadata_suffix_semantic, @@ -391,7 +404,9 @@ def _chunk_document( ) return chunks - def _handle_single_document(self, document: Document) -> list[DocAwareChunk]: + def _handle_single_document( + self, document: IndexingDocument + ) -> list[DocAwareChunk]: # Specifically for reproducing an issue with gmail if document.source == DocumentSource.GMAIL: logger.debug(f"Chunking {document.semantic_identifier}") @@ -427,9 +442,12 @@ def _handle_single_document(self, document: Document) -> list[DocAwareChunk]: title_prefix = "" metadata_suffix_semantic = "" - # Chunk the document - normal_chunks = self._chunk_document( + # Use processed_sections if available (IndexingDocument), otherwise use original sections + sections_to_chunk = document.processed_sections + + normal_chunks = self._chunk_document_with_sections( document, + sections_to_chunk, title_prefix, metadata_suffix_semantic, metadata_suffix_keyword, @@ -443,10 +461,12 @@ def _handle_single_document(self, document: Document) -> list[DocAwareChunk]: return normal_chunks - def chunk(self, documents: list[Document]) -> list[DocAwareChunk]: + def chunk(self, documents: list[IndexingDocument]) -> list[DocAwareChunk]: """ Takes in a list of documents and chunks them into smaller chunks for indexing while persisting the document metadata. + + Works with both standard Document objects and IndexingDocument objects with processed_sections. """ final_chunks: list[DocAwareChunk] = [] for document in documents: diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index 4f6d8b25d5c..d385bbbc2c9 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -19,6 +19,8 @@ from onyx.connectors.models import DocumentFailure from onyx.connectors.models import ImageSection from onyx.connectors.models import IndexAttemptMetadata +from onyx.connectors.models import IndexingDocument +from onyx.connectors.models import Section from onyx.connectors.models import TextSection from onyx.db.document import fetch_chunk_counts_for_documents from onyx.db.document import get_documents_by_ids @@ -61,6 +63,7 @@ class DocumentBatchPrepareContext(BaseModel): updatable_docs: list[Document] id_to_db_doc_map: dict[str, DBDocument] + indexable_docs: list[IndexingDocument] = [] model_config = ConfigDict(arbitrary_types_allowed=True) @@ -326,117 +329,147 @@ def filter_documents(document_batch: list[Document]) -> list[Document]: return documents -def process_image_sections(documents: list[Document]) -> list[Document]: +def process_image_sections(documents: list[Document]) -> list[IndexingDocument]: """ - Process all ImageSection objects in the documents by generating text summaries - using a vision-capable LLM. + Process all sections in documents by: + 1. Converting both TextSection and ImageSection objects to base Section objects + 2. Processing ImageSections to generate text summaries using a vision-capable LLM + 3. Returning IndexingDocument objects with both original and processed sections Args: - documents: List of documents that may contain ImageSection objects + documents: List of documents with TextSection | ImageSection objects Returns: - The same documents with ImageSection objects converted to TextSection objects - with summarized text. + List of IndexingDocument objects with processed_sections as list[Section] """ # Get the vision LLM llm = get_default_llm_with_vision() if not llm: - print("No vision-capable LLM available. Image sections will not be processed.") logger.warning( "No vision-capable LLM available. Image sections will not be processed." ) - return documents + print("No vision-capable LLM available. Image sections will not be processed.") + + # Even without LLM, we still convert to IndexingDocument with base Sections + return [ + IndexingDocument( + **document.dict(), + processed_sections=[ + Section( + text=section.text if isinstance(section, TextSection) else None, + link=section.link, + image_file_name=section.image_file_name + if isinstance(section, ImageSection) + else None, + ) + for section in document.sections + ], + ) + for document in documents + ] + + indexed_documents: list[IndexingDocument] = [] for document in documents: - processed_sections: list[TextSection | ImageSection] = [] + processed_sections: list[Section] = [] print(f"Processing document ID: {document.id}, Title: {document.title}") print(f"Document has {len(document.sections)} sections to process") for section in document.sections: - # If it's not an ImageSection or doesn't have an image_file_name, keep as is - if not isinstance(section, ImageSection): - processed_sections.append(section) - print(f"Skipping non-image section: {type(section)}") - continue + # For ImageSection, process and create base Section with both text and image_file_name + if isinstance(section, ImageSection): + print( + f"Processing ImageSection with image_file_name: {section.image_file_name}" + ) - print( - f"Processing ImageSection with image_file_name: {section.image_file_name}" - ) - # Get the image data from PGFileStore - try: - with get_session_with_current_tenant() as db_session: - print( - f"Looking up image file in PGFileStore: {section.image_file_name}" - ) - pgfilestore = get_pgfilestore_by_file_name( - file_name=section.image_file_name, db_session=db_session - ) - if not pgfilestore: - logger.warning( - f"Image file {section.image_file_name} not found in PGFileStore" - ) + # Default section with image path preserved + processed_section = Section( + link=section.link, + image_file_name=section.image_file_name, + text=None, # Will be populated if summarization succeeds + ) + + # Try to get image summary + try: + with get_session_with_current_tenant() as db_session: print( - f"WARNING: Image file {section.image_file_name} not found in PGFileStore" + f"Looking up image file in PGFileStore: {section.image_file_name}" ) - # Keep the original section but without image processing - processed_sections.append( - TextSection( - text="[Image could not be processed]", - link=section.link, - ) + pgfilestore = get_pgfilestore_by_file_name( + file_name=section.image_file_name, db_session=db_session ) - continue - # Get the image data - print( - f"Found pgfilestore with lobj_oid: {pgfilestore.lobj_oid}, display_name: {pgfilestore.display_name}" - ) - pgfilestore_data = read_lobj( - pgfilestore.lobj_oid, db_session - ).read() - print(f"Read image data, size: {len(pgfilestore_data)} bytes") - - # Summarize the image - print( - f"Attempting to summarize image with LLM: {type(llm).__name__}" - ) - summary = summarize_image_with_error_handling( - llm=llm, - image_data=pgfilestore_data, - context_name=pgfilestore.display_name or "Image", - ) - print(f"Image summary result: {'Success' if summary else 'Failed'}") - if summary: - print(f"Summary length: {len(summary)} chars") - else: - print("No summary was generated") - - # Create a TextSection with the summary - processed_sections.append( - TextSection( - text=summary or "[Image could not be summarized]", - link=section.link, - ) - ) - except Exception as e: - logger.error(f"Error processing image section: {e}") - print(f"ERROR processing image section: {e}") - print(f"Exception type: {type(e).__name__}") - print(f"Traceback: {traceback.format_exc()}") - processed_sections.append( - TextSection( - text="[Error processing image]", - link=section.link, - ) + if not pgfilestore: + logger.warning( + f"Image file {section.image_file_name} not found in PGFileStore" + ) + print( + f"WARNING: Image file {section.image_file_name} not found in PGFileStore" + ) + processed_section.text = "[Image could not be processed]" + else: + # Get the image data + image_data_io = read_lobj( + pgfilestore.lobj_oid, db_session, mode="rb" + ) + pgfilestore_data = image_data_io.read() + print( + f"Read image data, size: {len(pgfilestore_data)} bytes" + ) + + # Summarize the image + print( + f"Attempting to summarize image with LLM: {type(llm).__name__}" + ) + summary = summarize_image_with_error_handling( + llm=llm, + image_data=pgfilestore_data, + context_name=pgfilestore.display_name or "Image", + ) + + print( + f"Image summary result: {'Success' if summary else 'Failed'}" + ) + if summary: + print(f"Summary length: {len(summary)} chars") + processed_section.text = summary + else: + print("No summary was generated") + processed_section.text = ( + "[Image could not be summarized]" + ) + except Exception as e: + logger.error(f"Error processing image section: {e}") + print(f"ERROR processing image section: {e}") + print(f"Exception type: {type(e).__name__}") + print(f"Traceback: {traceback.format_exc()}") + processed_section.text = "[Error processing image]" + + processed_sections.append(processed_section) + + # For TextSection, create a base Section with text and link + elif isinstance(section, TextSection): + print("Processing TextSection") + processed_section = Section( + text=section.text, link=section.link, image_file_name=None ) + processed_sections.append(processed_section) + + # If it's already a base Section (unlikely), just append it + else: + processed_sections.append(section) + + # Create IndexingDocument with original sections and processed_sections + indexed_document = IndexingDocument( + **document.dict(), processed_sections=processed_sections + ) + indexed_documents.append(indexed_document) - # Replace the document's sections with the processed ones - document.sections = processed_sections print( f"Finished processing document ID: {document.id}, processed {len(processed_sections)} sections" ) - return documents + return indexed_documents @log_function_time(debug_only=True) @@ -505,16 +538,14 @@ def index_doc_batch( # Process any ImageSection objects before chunking logger.debug("Processing image sections") print(f"Processing image sections for {len(ctx.updatable_docs)} documents") - # new_docs = process_image_sections(ctx.updatable_docs) - # for doc in new_docs: - # print(type(doc.sections[0])) - ctx.updatable_docs = process_image_sections(ctx.updatable_docs) + # Convert documents to IndexingDocument objects with processed sections + ctx.indexable_docs = process_image_sections(ctx.updatable_docs) logger.debug("Starting chunking") # NOTE: no special handling for failures here, since the chunker is not # a common source of failure for the indexing pipeline - chunks: list[DocAwareChunk] = chunker.chunk(ctx.updatable_docs) + chunks: list[DocAwareChunk] = chunker.chunk(ctx.indexable_docs) logger.debug("Starting embedding") chunks_with_embeddings, embedding_failures = ( diff --git a/backend/tests/unit/onyx/indexing/test_chunker.py b/backend/tests/unit/onyx/indexing/test_chunker.py index 1143517389e..57ba3fe1241 100644 --- a/backend/tests/unit/onyx/indexing/test_chunker.py +++ b/backend/tests/unit/onyx/indexing/test_chunker.py @@ -5,6 +5,7 @@ from onyx.connectors.models import TextSection from onyx.indexing.chunker import Chunker from onyx.indexing.embedder import DefaultIndexingEmbedder +from onyx.indexing.indexing_pipeline import process_image_sections from tests.unit.onyx.indexing.conftest import MockHeartbeat @@ -42,12 +43,13 @@ def test_chunk_document(embedder: DefaultIndexingEmbedder) -> None: TextSection(text=short_section_4, link="link5"), ], ) + indexing_documents = process_image_sections([document]) chunker = Chunker( tokenizer=embedder.embedding_model.tokenizer, enable_multipass=False, ) - chunks = chunker.chunk([document]) + chunks = chunker.chunk(indexing_documents) assert len(chunks) == 5 assert short_section_1 in chunks[0].content @@ -70,6 +72,7 @@ def test_chunker_heartbeat( TextSection(text="This is a short section.", link="link1"), ], ) + indexing_documents = process_image_sections([document]) chunker = Chunker( tokenizer=embedder.embedding_model.tokenizer, @@ -77,7 +80,7 @@ def test_chunker_heartbeat( callback=mock_heartbeat, ) - chunks = chunker.chunk([document]) + chunks = chunker.chunk(indexing_documents) assert mock_heartbeat.call_count == 1 assert len(chunks) > 0