Skip to content

Commit

Permalink
half working state
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Mar 8, 2025
1 parent 746f55a commit a201d2a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 16 deletions.
34 changes: 27 additions & 7 deletions backend/onyx/connectors/confluence/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
Includes the page content, comments, and attachments.
"""
try:
print("converting page to document")
# Extract basic page information
page_id = page["id"]
page_title = page["title"]
Expand Down Expand Up @@ -257,21 +258,28 @@ def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
)

for attachment in attachments.get("results", []):
print("zattachment", attachment)
# Process each attachment
result = process_attachment(
self.confluence_client,
attachment,
page_title,
)
try:
result = process_attachment(
self.confluence_client,
attachment,
page_title,
)
except Exception as e:
print("error", e)
print("result", result)

if result.text:
print("result.text", result.text)
# Create a section for the attachment text
attachment_section = TextSection(
text=result.text,
link=f"{page_url}#attachment-{attachment['id']}",
)
sections.append(attachment_section)
elif result.file_name:
print("result.file_name", result.file_name)
# Create an ImageSection for image attachments
image_section = ImageSection(
text="",
Expand All @@ -284,6 +292,8 @@ def _convert_page_to_document(self, page: dict[str, Any]) -> Document | None:
f"Error processing attachment '{attachment.get('title')}': {result.error}"
)

print("sections", sections)

# Extract metadata
metadata = {}
if "space" in page:
Expand Down Expand Up @@ -372,20 +382,30 @@ def _fetch_document_batches(
attachment=attachment,
page_context=confluence_xml,
)
print("response", response)
if response is None:
print("response is None")
continue

content_text, file_storage_name = response

print("content_text", content_text)
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
)

print("object_url", object_url)
print("file_storage_name", file_storage_name)
if content_text:
doc.sections.append(
TextSection(
text=content_text,
link=object_url,
)
)
elif file_storage_name:
print("file_storage_name", file_storage_name)
doc.sections.append(
ImageSection(
link=object_url,
image_file_name=file_storage_name,
)
)
Expand Down
7 changes: 6 additions & 1 deletion backend/onyx/connectors/confluence/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ def process_attachment(
or if it's an image, stores it for later analysis. Returns a structured result.
"""
try:
print("attachment", attachment)
# Get the media type from the attachment metadata
media_type = attachment.get("metadata", {}).get("mediaType", "")

print("media_type", media_type)
# Validate the attachment type
if not validate_attachment_filetype(attachment):
return AttachmentProcessingResult(
Expand All @@ -134,13 +135,16 @@ def process_attachment(

# Process document attachments
try:
print("Processing document attachment:", attachment["title"])
text = extract_file_text(
file=BytesIO(raw_bytes),
file_name=attachment["title"],
)
print(f"Extracted {len(text)} characters from attachment")

# Skip if the text is too long
if len(text) > CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD:
print(f"Attachment text exceeds threshold: {len(text)} chars")
return AttachmentProcessingResult(
text=None,
file_name=None,
Expand All @@ -165,6 +169,7 @@ def _process_image_attachment(
raw_bytes: bytes,
media_type: str,
) -> AttachmentProcessingResult:
print("processing image attachment")
"""Process an image attachment by saving it without generating a summary."""
try:
# Use the standardized image storage and section creation
Expand Down
5 changes: 3 additions & 2 deletions backend/onyx/connectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ class Section(BaseModel):


class TextSection(BaseModel):
text: str
link: str | None = None


class ImageSection(BaseModel):
text: str
link: str | None = None
image_file_name: str


Expand Down Expand Up @@ -112,7 +113,7 @@ class DocumentBase(BaseModel):
"""Used for Onyx ingestion api, the ID is inferred before use if not provided"""

id: str | None = None
sections: list[TextSection | ImageSection]
sections: list[TextSection | ImageSection | Section]
source: DocumentSource | None = None
semantic_identifier: str # displayed in the UI as the main identifier for the doc
metadata: dict[str, str | list[str]]
Expand Down
52 changes: 46 additions & 6 deletions backend/onyx/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import ImageSection
from onyx.connectors.models import IndexAttemptMetadata
from onyx.connectors.models import TextSection
from onyx.connectors.models import Section
from onyx.db.document import fetch_chunk_counts_for_documents
from onyx.db.document import get_documents_by_ids
from onyx.db.document import mark_document_as_indexed_for_cc_pair__no_commit
Expand Down Expand Up @@ -330,33 +330,48 @@ def process_image_sections(documents: list[Document]) -> list[Document]:
# Get the vision LLM
llm = get_default_llm_with_vision()
if not llm:
print("No vision-capable LLM available. Image sections will not be processed.")
logger.warning(
"No vision-capable LLM available. Image sections will not be processed."
)
return documents

for document in documents:
processed_sections: list[TextSection] = []
processed_sections: list[Section] = []
print(f"Processing document ID: {document.id}, Title: {document.title}")
print(f"Document has {len(document.sections)} sections to process")

for section in document.sections:
# If it's not an ImageSection or doesn't have an image_file_name, keep as is
if not isinstance(section, ImageSection) or not section.image_file_name:
processed_sections.append(section)
processed_sections.append(Section(text=section.text, link=section.link))
print(
f"Skipping non-image section or section without image_file_name: {type(section)}"
)
continue

print(
f"Processing ImageSection with image_file_name: {section.image_file_name}"
)
# Get the image data from PGFileStore
try:
with get_session_with_current_tenant() as db_session:
print(
f"Looking up image file in PGFileStore: {section.image_file_name}"
)
pgfilestore = get_pgfilestore_by_file_name(
file_name=section.image_file_name, db_session=db_session
)
if not pgfilestore:
logger.warning(
f"Image file {section.image_file_name} not found in PGFileStore"
)
print(
f"WARNING: Image file {section.image_file_name} not found in PGFileStore"
)
# Keep the original section but without image processing
processed_sections.append(
TextSection(
Section(
text="[Image could not be processed]",
link=section.link,
image_file_name=section.image_file_name,
Expand All @@ -365,29 +380,46 @@ def process_image_sections(documents: list[Document]) -> list[Document]:
continue

# Get the image data
print(
f"Found pgfilestore with lobj_oid: {pgfilestore.lobj_oid}, display_name: {pgfilestore.display_name}"
)
pgfilestore_data = read_lobj(
pgfilestore.lobj_oid, db_session
).read()
print(f"Read image data, size: {len(pgfilestore_data)} bytes")

# Summarize the image
print(
f"Attempting to summarize image with LLM: {type(llm).__name__}"
)
summary = summarize_image_with_error_handling(
llm=llm,
image_data=pgfilestore_data,
context_name=pgfilestore.display_name or "Image",
)
print(f"Image summary result: {'Success' if summary else 'Failed'}")
if summary:
print(f"Summary length: {len(summary)} chars")
else:
print("No summary was generated")

# Create a TextSection with the summary
processed_sections.append(
TextSection(
Section(
text=summary or "[Image could not be summarized]",
link=section.link,
image_file_name=section.image_file_name,
)
)
except Exception as e:
logger.error(f"Error processing image section: {e}")
print(f"ERROR processing image section: {e}")
print(f"Exception type: {type(e).__name__}")
import traceback

print(f"Traceback: {traceback.format_exc()}")
processed_sections.append(
TextSection(
Section(
text="[Error processing image]",
link=section.link,
image_file_name=section.image_file_name,
Expand All @@ -396,6 +428,9 @@ def process_image_sections(documents: list[Document]) -> list[Document]:

# Replace the document's sections with the processed ones
document.sections = processed_sections
print(
f"Finished processing document ID: {document.id}, processed {len(processed_sections)} sections"
)

return documents

Expand Down Expand Up @@ -465,6 +500,11 @@ def index_doc_batch(

# Process any ImageSection objects before chunking
logger.debug("Processing image sections")
print(f"Processing image sections for {len(ctx.updatable_docs)} documents")
# new_docs = process_image_sections(ctx.updatable_docs)
# for doc in new_docs:
# print(type(doc.sections[0]))

ctx.updatable_docs = process_image_sections(ctx.updatable_docs)

logger.debug("Starting chunking")
Expand Down

0 comments on commit a201d2a

Please sign in to comment.