Skip to content

Commit

Permalink
feat(artifact): adopt the advanced converting pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Yougigun committed Nov 5, 2024
1 parent c630c10 commit f15928b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 20 deletions.
26 changes: 14 additions & 12 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ const chunkLength = 1024
const chunkOverlap = 200
const NamespaceID = "preset"

// Note: this pipeline is for the old indexing pipeline
// Note: this pipeline is for the old indexing pipeline having convert_result
const ConvertDocToMDPipelineID = "indexing-convert-pdf"
const DocToMDVersion = "v1.1.1"

// TODO: we revert to the old pipeline. it will change to the new pipeline later.
const ConvertDocToMDPipelineID2 = "indexing-convert-pdf"

// TODO: we need to update the version after the new pipeline is ready
const DocToMDVersion2 = "v1.1.1"
// Note: this pipeline is for the new indexing pipeline having convert_result or convert_result2
const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc"
const DocToMDVersion2 = "v1.2.0"

const MdChunkPipelineID = "indexing-split-markdown"
const MdSplitVersion = "v2.0.0"
Expand Down Expand Up @@ -144,8 +142,9 @@ func getFileTypePrefix(fileType artifactPb.FileType) string {
}
}

// Helper function to safely extract the "convert_result" from the response.
// It checks if the index and key are available to avoid nil pointer issues.
// getConvertResult extracts the conversion result from the pipeline response.
// It first checks for a non-empty "convert_result" field, then falls back to "convert_result2".
// Returns an error if neither field contains valid data or if the response structure is invalid.
func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) (string, error) {
if resp == nil || len(resp.Outputs) == 0 {
return "", fmt.Errorf("response is nil or has no outputs. resp: %v", resp)
Expand All @@ -155,10 +154,14 @@ func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse)
return "", fmt.Errorf("fields in the output are nil. resp: %v", resp)
}
convertResult, ok := fields["convert_result"]
if !ok {
return "", fmt.Errorf("convert_result not found in the output fields. resp: %v", resp)
if ok && convertResult.GetStringValue() != "" {
return convertResult.GetStringValue(), nil
}
return convertResult.GetStringValue(), nil
convertResult2, ok2 := fields["convert_result2"]
if ok2 && convertResult2.GetStringValue() != "" {
return convertResult2.GetStringValue(), nil
}
return "", fmt.Errorf("convert_result or convert_result2 not found in the output fields. resp: %v", resp)
}

type Chunk = struct {
Expand Down Expand Up @@ -365,7 +368,6 @@ func (s *Service) EmbeddingTextPipe(ctx context.Context, caller uuid.UUID, reque
batch := texts[i:end]
batchIndex := i / maxBatchSize


// Acquire semaphore before starting goroutine
sem <- struct{}{}
wg.Add(1)
Expand Down
55 changes: 47 additions & 8 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,37 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r
return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING, nil
}

// Processes a file with the status "chunking".
// If the file is a PDF or other document type, it retrieves the converted file from MinIO and calls the markdown chunking pipeline.
// If the file is a text or markdown file, it retrieves the file from MinIO and calls the respective chunking pipeline.
// The resulting chunks are saved into object storage and metadata is updated in the database.
// Finally, the file status is updated to "embedding" in the database.
// Processes a file with the status "chunking" by splitting it into text chunks.
// The processing varies by file type:
//
// For PDF, DOC, DOCX, PPT, PPTX, HTML, XLSX, XLS, CSV:
// - Retrieves converted file from MinIO
// - For spreadsheet files (XLSX, XLS, CSV): Uses markdown chunking pipeline
// - For other document types: Uses text chunking pipeline
//
// For TEXT files:
// - Retrieves original file from MinIO
// - Uses text chunking pipeline
//
// For MARKDOWN files:
// - Retrieves original file from MinIO
// - Uses markdown chunking pipeline
//
// For all file types:
// - Saves chunks to object storage
// - Updates metadata in database with chunking pipeline info
// - Updates file status to "embedding"
//
// Parameters:
// - ctx: Context for the operation
// - file: KnowledgeBaseFile struct containing file metadata
//
// Returns:
// - updatedFile: Updated KnowledgeBaseFile after processing
// - nextStatus: Next file process status (EMBEDDING if successful)
// - err: Error if any step fails
//
// The function handles errors at each step and returns appropriate status codes.
func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) {
logger, _ := logger.GetZapLogger(ctx)
logger.Info("Processing chunking status file.", zap.String("File uid", file.UID.String()))
Expand Down Expand Up @@ -527,10 +553,23 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}

// TODO: some file use splitTextPipe and some use splitMarkdownPipe
// call the markdown chunking pipeline
requesterUID := file.RequesterUID
chunks, err := wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
chunks := []service.Chunk{}
switch file.Type {
case artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
requesterUID := file.RequesterUID
chunks, err = wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
case artifactpb.FileType_FILE_TYPE_PDF.String(),
artifactpb.FileType_FILE_TYPE_DOCX.String(),
artifactpb.FileType_FILE_TYPE_DOC.String(),
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_PPT.String(),
artifactpb.FileType_FILE_TYPE_HTML.String():
requesterUID := file.RequesterUID
chunks, err = wp.svc.SplitTextPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
}
if err != nil {
logger.Error("Failed to get chunks from converted file using markdown chunking pipeline.", zap.String("Converted file uid", convertedFile.UID.String()))
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
Expand Down

0 comments on commit f15928b

Please sign in to comment.