Skip to content

Commit

Permalink
feat(catalog): support concurrent text to embedding process (#85)
Browse files Browse the repository at this point in the history
Because 

the chunk-to-embedding process is too slow

Commit 

sends concurrent requests to the embedding pipeline, with a batch size
of 32 per request.
  • Loading branch information
Yougigun authored Aug 29, 2024
1 parent 8d844b3 commit 12d313c
Show file tree
Hide file tree
Showing 9 changed files with 577 additions and 383 deletions.
60 changes: 31 additions & 29 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"gorm.io/gorm"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcZap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"

"github.com/instill-ai/artifact-backend/config"
"github.com/instill-ai/artifact-backend/pkg/acl"
Expand All @@ -47,14 +47,14 @@ import (
"github.com/instill-ai/artifact-backend/pkg/usage"
"github.com/instill-ai/artifact-backend/pkg/worker"

grpcclient "github.com/instill-ai/artifact-backend/pkg/client/grpc"
httpclient "github.com/instill-ai/artifact-backend/pkg/client/http"
grpcClient "github.com/instill-ai/artifact-backend/pkg/client/grpc"
httpClient "github.com/instill-ai/artifact-backend/pkg/client/http"
database "github.com/instill-ai/artifact-backend/pkg/db"
custom_otel "github.com/instill-ai/artifact-backend/pkg/logger/otel"
customOtel "github.com/instill-ai/artifact-backend/pkg/logger/otel"
minio "github.com/instill-ai/artifact-backend/pkg/minio"
artifactPB "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
mgmtv1beta "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pipelinev1beta "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)

var propagator propagation.TextMapPropagator
Expand Down Expand Up @@ -94,7 +94,7 @@ func main() {
// setup tracing and metrics
ctx, cancel := context.WithCancel(context.Background())

tp, err := custom_otel.SetupTracing(ctx, "artifact-backend")
tp, err := customOtel.SetupTracing(ctx, "artifact-backend")
if err != nil {
panic(err)
}
Expand All @@ -115,7 +115,7 @@ func main() {
}()

// verbosity 3 will avoid [transport] from emitting
grpc_zap.ReplaceGrpcLoggerV2WithVerbosity(logger, 3)
grpcZap.ReplaceGrpcLoggerV2WithVerbosity(logger, 3)

// Initialize clients needed for service
pipelinePublicServiceClient, pipelinePublicGrpcConn, _, mgmtPublicServiceClientConn, mgmtPrivateServiceClient, mgmtPrivateServiceGrpcConn,
Expand Down Expand Up @@ -145,7 +145,7 @@ func main() {
minioClient,
mgmtPrivateServiceClient,
pipelinePublicServiceClient,
httpclient.NewRegistryClient(ctx),
httpClient.NewRegistryClient(ctx),
redisClient,
milvusClient,
aclClient)
Expand Down Expand Up @@ -183,12 +183,12 @@ func main() {

// activate file-to-embeddings worker pool
wp := worker.NewFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers)
wp.Start(ctx)
wp.Start()

// Start usage reporter
var usg usage.Usage
if config.Config.Server.Usage.Enabled {
usageServiceClient, usageServiceClientConn := grpcclient.NewUsageClient(ctx)
usageServiceClient, usageServiceClientConn := grpcClient.NewUsageClient(ctx)
if usageServiceClientConn != nil {
defer usageServiceClientConn.Close()
logger.Info("try to start usage reporter")
Expand Down Expand Up @@ -277,15 +277,17 @@ func main() {
logger.Info("Shutting down server...")
publicGrpcS.GracefulStop()
wp.GraceFulStop()
logger.Info("server shutdown 1")
}
fmt.Println("server shutdown 2")
}

func newClients(ctx context.Context, logger *zap.Logger) (
pipelinev1beta.PipelinePublicServiceClient,
pipelinePB.PipelinePublicServiceClient,
*grpc.ClientConn,
mgmtv1beta.MgmtPublicServiceClient,
mgmtPB.MgmtPublicServiceClient,
*grpc.ClientConn,
mgmtv1beta.MgmtPrivateServiceClient,
mgmtPB.MgmtPrivateServiceClient,
*grpc.ClientConn,
*redis.Client,
influxdb2.Client,
Expand All @@ -298,25 +300,25 @@ func newClients(ctx context.Context, logger *zap.Logger) (
) {

// init pipeline grpc client
pipelinePublicGrpcConn, err := grpcclient.NewGRPCConn(
pipelinePublicGrpcConn, err := grpcClient.NewGRPCConn(
fmt.Sprintf("%v:%v", config.Config.PipelineBackend.Host,
config.Config.PipelineBackend.PublicPort),
config.Config.PipelineBackend.HTTPS.Cert,
config.Config.PipelineBackend.HTTPS.Key)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to create pipeline public grpc client: %v", err))
}
pipelinePublicServiceClient := pipelinev1beta.NewPipelinePublicServiceClient(pipelinePublicGrpcConn)
pipelinePublicServiceClient := pipelinePB.NewPipelinePublicServiceClient(pipelinePublicGrpcConn)

// initialize mgmt clients
mgmtPrivateServiceClient, mgmtPrivateServiceClientConn := grpcclient.NewMGMTPrivateClient(ctx)
mgmtPublicServiceClient, mgmtPublicServiceClientConn := grpcclient.NewMGMTPublicClient(ctx)
mgmtPrivateServiceClient, mgmtPrivateServiceClientConn := grpcClient.NewMGMTPrivateClient(ctx)
mgmtPublicServiceClient, mgmtPublicServiceClientConn := grpcClient.NewMGMTPublicClient(ctx)

// Initialize redis client
redisClient := redis.NewClient(&config.Config.Cache.Redis.RedisOptions)

// Initialize InfluxDB client
influxDBClient, influxDBWriteClient := httpclient.NewInfluxDBClient(ctx)
influxDBClient, influxDBWriteClient := httpClient.NewInfluxDBClient(ctx)

influxErrCh := influxDBWriteClient.Errors()
go func() {
Expand Down Expand Up @@ -356,8 +358,8 @@ func newClients(ctx context.Context, logger *zap.Logger) (

func newGrpcOptionAndCreds(logger *zap.Logger) ([]grpc.ServerOption, credentials.TransportCredentials) {
// Shared options for the logger, with a custom gRPC code to log level function.
opts := []grpc_zap.Option{
grpc_zap.WithDecider(func(fullMethodName string, err error) bool {
opts := []grpcZap.Option{
grpcZap.WithDecider(func(fullMethodName string, err error) bool {
// will not log gRPC calls if it was a call to liveness or readiness and no error was raised
if err == nil {
if match, _ := regexp.MatchString("artifact.artifact.v1alpha.ArtifactPublicService/.*ness$", fullMethodName); match {
Expand All @@ -373,15 +375,15 @@ func newGrpcOptionAndCreds(logger *zap.Logger) ([]grpc.ServerOption, credentials
}),
}
grpcServerOpts := []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc.StreamInterceptor(grpcMiddleware.ChainStreamServer(
middleware.StreamAppendMetadataInterceptor,
grpc_zap.StreamServerInterceptor(logger, opts...),
grpc_recovery.StreamServerInterceptor(middleware.RecoveryInterceptorOpt()),
grpcZap.StreamServerInterceptor(logger, opts...),
grpcRecovery.StreamServerInterceptor(middleware.RecoveryInterceptorOpt()),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc.UnaryInterceptor(grpcMiddleware.ChainUnaryServer(
middleware.UnaryAppendMetadataAndErrorCodeInterceptor,
grpc_zap.UnaryServerInterceptor(logger, opts...),
grpc_recovery.UnaryServerInterceptor(middleware.RecoveryInterceptorOpt()),
grpcZap.UnaryServerInterceptor(logger, opts...),
grpcRecovery.UnaryServerInterceptor(middleware.RecoveryInterceptorOpt()),
)),
}

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,6 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240821064525-8e6de58da061 h1:oewJfLF4UlabEV3CLuCjLgwo1untxckrOtq9pqJitR4=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240821064525-8e6de58da061/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240826094216-ad773d684498 h1:uk5MtLSOAif+Y4rcY+f47LtxX2nPcIXScZaKzzBzKEE=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240826094216-ad773d684498/go.mod h1:2blmpUwiTwxIDnrjIqT6FhR5ewshZZF554wzjXFvKpQ=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE=
Expand Down
38 changes: 17 additions & 21 deletions pkg/client/grpc/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package grpcclient
// "io"
// "os"
// "testing"
// "time"

// "github.com/instill-ai/artifact-backend/pkg/service"
// pipelinev1beta "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
Expand Down Expand Up @@ -79,32 +80,27 @@ package grpcclient
// ctx := metadata.NewOutgoingContext(context.Background(), md)
// pipelinePublicServiceClient := pipelinev1beta.NewPipelinePublicServiceClient(pipelinePublicGrpcConn)
// req := &pipelinev1beta.TriggerNamespacePipelineReleaseRequest{
// NamespaceId: "preset",
// PipelineId: "indexing-embed",
// ReleaseId: "v1.1.1",
// Data: []*pipelinev1beta.TriggerData{
// {
// Variable: &structpb.Struct{
// Fields: map[string]*structpb.Value{
// "chunk_input": {
// Kind: &structpb.Value_StringValue{
// StringValue: "test",
// },
// },
// },
// },
// Secret: map[string]string{
// "key": "value",
// },
// },
// NamespaceId: service.NamespaceID,
// PipelineId: service.TextEmbedPipelineID,
// ReleaseId: service.TextEmbedVersion,
// Inputs: []*structpb.Struct{
// {Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}},
// {Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}},
// {Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}},
// {Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}},
// // {Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}},
// // {Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}},
// },
// Inputs: []*structpb.Struct{{Fields: map[string]*structpb.Value{"chunk_input": {Kind: &structpb.Value_StringValue{StringValue: "test"}}}}},
// }
// // print the time of calling pipeline
// startTime := time.Now()
// res, err := pipelinePublicServiceClient.TriggerNamespacePipelineRelease(ctx, req)
// if err != nil {
// t.Fatalf("failed to trigger pipeline: %v", err)
// }
// vector, err := service.GetVectorFromResponse(res)
// elapsedTime := time.Since(startTime)
// fmt.Println("elapsed time:", elapsedTime)
// vector, err := service.GetVectorsFromResponse(res)
// if err != nil {
// t.Fatalf("failed to trigger pipeline: %v", err)
// }
Expand Down Expand Up @@ -202,7 +198,7 @@ package grpcclient
// ctx := metadata.NewOutgoingContext(context.Background(), md)
// pipelinePublicServiceClient := pipelinev1beta.NewPipelinePublicServiceClient(pipelinePublicGrpcConn)

// mdString, err := readMdToString("../../../test_converted_pdf_.md")
// mdString, err := readMdToString("../../../test_files/test_converted_pdf_.md")
// if err != nil {
// t.Fatalf("failed to read pdf file: %v", err)
// }
Expand Down
34 changes: 17 additions & 17 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
quota, humanReadable := tier.GetFileStorageTotalQuota()
if totalUsageInNamespace+fileSize > int64(quota) {
return nil, fmt.Errorf(
"file storage totalquota exceeded. max: %v. tier:%v, err: %w",
"file storage total quota exceeded. max: %v. tier:%v, err: %w",
humanReadable, tier.String(), customerror.ErrInvalidArgument)
}

Expand Down Expand Up @@ -335,13 +335,13 @@ func (ph *PublicHandler) DeleteCatalogFile(
return nil, fmt.Errorf("file uid is required. err: %w", customerror.ErrInvalidArgument)
}

fuid, err := uuid.FromString(req.FileUid)
fUID, err := uuid.FromString(req.FileUid)
if err != nil {
return nil, fmt.Errorf("failed to parse file uid. err: %w", customerror.ErrInvalidArgument)
}

// get the file by uid
files, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{fuid})
files, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{fUID})
if err != nil {
return nil, err
}
Expand All @@ -354,33 +354,33 @@ func (ph *PublicHandler) DeleteCatalogFile(
// kb file in minio
objectPaths = append(objectPaths, files[0].Destination)
// converted file in minio
cf, err := ph.service.Repository.GetConvertedFileByFileUID(ctx, fuid)
cf, err := ph.service.Repository.GetConvertedFileByFileUID(ctx, fUID)
if err == nil {
objectPaths = append(objectPaths, cf.Destination)
}
// chunks in minio
chunks, _ := ph.service.Repository.ListChunksByKbFileUID(ctx, fuid)
chunks, _ := ph.service.Repository.ListChunksByKbFileUID(ctx, fUID)
if len(chunks) > 0 {
for _, chunk := range chunks {
objectPaths = append(objectPaths, chunk.ContentDest)
}
}
// delete the embeddings in milvus(need to delete first)
embUIDs := []string{}
embs, _ := ph.service.Repository.ListEmbeddingsByKbFileUID(ctx, fuid)
for _, emb := range embs {
embeddings, _ := ph.service.Repository.ListEmbeddingsByKbFileUID(ctx, fUID)
for _, emb := range embeddings {
embUIDs = append(embUIDs, emb.UID.String())
}
_ = ph.service.MilvusClient.DeleteEmbeddingsInKb(ctx, files[0].KnowledgeBaseUID.String(), embUIDs)

_ = ph.service.MinIO.DeleteFiles(ctx, objectPaths)
// delete the converted file in postgres
_ = ph.service.Repository.HardDeleteConvertedFileByFileUID(ctx, fuid)
// delete the chunks in postgres
_ = ph.service.Repository.HardDeleteChunksByKbFileUID(ctx, fuid)
// delete the embeddings in postgres
_ = ph.service.Repository.HardDeleteEmbeddingsByKbFileUID(ctx, fuid)
// delete the file in postgres
// delete the converted file in postgreSQL
_ = ph.service.Repository.HardDeleteConvertedFileByFileUID(ctx, fUID)
// delete the chunks in postgreSQL
_ = ph.service.Repository.HardDeleteChunksByKbFileUID(ctx, fUID)
// delete the embeddings in postgreSQL
_ = ph.service.Repository.HardDeleteEmbeddingsByKbFileUID(ctx, fUID)
// delete the file in postgreSQL
err = ph.service.Repository.DeleteKnowledgeBaseFile(ctx, req.FileUid)
if err != nil {
return nil, err
Expand All @@ -401,14 +401,14 @@ func (ph *PublicHandler) ProcessCatalogFiles(ctx context.Context, req *artifactp

log, _ := logger.GetZapLogger(ctx)
// ACL - check if the uid can process file. ACL.
// chekc the fiels's kb_uid and use kb_uid to check if user has write permission
// check the file's kb_uid and use kb_uid to check if user has write permission
fileUUIDs := make([]uuid.UUID, 0, len(req.FileUids))
for _, fileUID := range req.FileUids {
fuid, err := uuid.FromString(fileUID)
fUID, err := uuid.FromString(fileUID)
if err != nil {
return nil, fmt.Errorf("failed to parse file uid. err: %w", customerror.ErrInvalidArgument)
}
fileUUIDs = append(fileUUIDs, fuid)
fileUUIDs = append(fileUUIDs, fUID)
}
kbfs, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, fileUUIDs)
if err != nil {
Expand Down
Loading

0 comments on commit 12d313c

Please sign in to comment.