Skip to content

Commit

Permalink
fix(catalog): call embedding with max 32 size batch (#60)
Browse files Browse the repository at this point in the history
Because

pipleine has limit size of batch, it causes calling embedding pipeline
failed.

This commit

limits the max batch size to 32
  • Loading branch information
Yougigun authored Aug 1, 2024
1 parent 786790c commit e6b25ec
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,32 +169,45 @@ func (s *Service) SplitText(ctx context.Context, caller uuid.UUID, text string)

// VectorizeText using embedding pipeline to vectorize text and consume caller's credits
func (s *Service) VectorizeText(ctx context.Context, caller uuid.UUID, texts []string) ([][]float32, error) {
md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"})
ctx = metadata.NewOutgoingContext(ctx, md)
inputs := make([]*structpb.Struct, 0, len(texts))
for _, text := range texts {
inputs = append(inputs, &structpb.Struct{
Fields: map[string]*structpb.Value{
"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: text}},
},
})
}

req := &pipelinev1beta.TriggerNamespacePipelineReleaseRequest{
NamespaceId: NamespaceID,
PipelineId: TextEmbedPipelineID,
ReleaseId: TextEmbedVersion,
Inputs: inputs,
}
res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextEmbedPipelineID, err)
}
result, err := GetVectorFromResponse(res)
if err != nil {
return nil, err
}
return result, nil
const maxBatchSize = 32
md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"})
ctx = metadata.NewOutgoingContext(ctx, md)
var allResults [][]float32

for i := 0; i < len(texts); i += maxBatchSize {
end := i + maxBatchSize
if end > len(texts) {
end = len(texts)
}
batch := texts[i:end]

inputs := make([]*structpb.Struct, 0, len(batch))
for _, text := range batch {
inputs = append(inputs, &structpb.Struct{
Fields: map[string]*structpb.Value{
"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: text}},
},
})
}

req := &pipelinev1beta.TriggerNamespacePipelineReleaseRequest{
NamespaceId: NamespaceID,
PipelineId: TextEmbedPipelineID,
ReleaseId: TextEmbedVersion,
Inputs: inputs,
}
res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextEmbedPipelineID, err)
}
result, err := GetVectorFromResponse(res)
if err != nil {
return nil, err
}
allResults = append(allResults, result...)
}

return allResults, nil
}

// GetVectorFromResponse converts the pipeline response into a slice of float32.
Expand Down

0 comments on commit e6b25ec

Please sign in to comment.