Skip to content

Commit

Permalink
fix(artifact): improve catalog deletion slow issue (#93)
Browse files Browse the repository at this point in the history
Because

deleting a catalog with too many or huge files will be extremely slow. 

This commit

makes the deletion in the background and marks the catalog as deleted.
  • Loading branch information
Yougigun authored Sep 13, 2024
1 parent 1b135e1 commit 0bbf3f2
Show file tree
Hide file tree
Showing 13 changed files with 795 additions and 292 deletions.
7 changes: 5 additions & 2 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func main() {
privatePort := fmt.Sprintf(":%d", config.Config.Server.PrivatePort)
// Wait for interrupt signal to gracefully shutdown the server with a timeout of 5 seconds.
errSig := make(chan error)
defer close(errSig)

go func() {
privateListener, err := net.Listen("tcp", privatePort)
Expand Down Expand Up @@ -266,21 +267,23 @@ func main() {
// kill -2 is syscall.SIGINT
// kill -9 is syscall.SIGKILL but can't be catch, so don't need add it
quitSig := make(chan os.Signal, 1)
defer close(quitSig)
signal.Notify(quitSig, syscall.SIGINT, syscall.SIGTERM)

select {
case err := <-errSig:
logger.Error(fmt.Sprintf("Fatal error: %v\n", err))
os.Exit(1)
case <-quitSig:
// if config.Config.Server.Usage.Enabled && usg != nil {
// usg.TriggerSingleReporter(ctx)
// }
logger.Info("Shutting down server...")
publicGrpcS.GracefulStop()
wp.GraceFulStop()
logger.Info("server shutdown 1")
logger.Info("server shutdown due to signal")
os.Exit(0)
}
fmt.Println("server shutdown 2")
}

func newClients(ctx context.Context, logger *zap.Logger) (
Expand Down
3 changes: 1 addition & 2 deletions pkg/handler/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func (ph *PublicHandler) GetFileCatalog(ctx context.Context, req *artifactpb.Get
if err != nil {
log.Error("failed to get file by file uid", zap.Error(err))
return nil, fmt.Errorf("failed to get file by file uid. err: %w", err)
}
if len(kbfs) == 0 {
} else if len(kbfs) == 0 {
log.Error("no file found by file uid", zap.String("file_uid", fileUID.String()))
return nil, fmt.Errorf("no file found by file uid: %s", fileUID.String())
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/handler/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func (ph *PublicHandler) ListChunks(ctx context.Context, req *artifactpb.ListChu
if err != nil {
log.Error("failed to get knowledge base files by file uids", zap.Error(err))
return nil, fmt.Errorf("failed to get knowledge base files by file uids")
}
if len(kbfs) == 0 {
} else if len(kbfs) == 0 {
log.Error("no files found for the given file uids")
return nil, fmt.Errorf("no files found for the given file uids: %v. err: %w", fileUIDs, customerror.ErrNotFound)
}
Expand Down
104 changes: 67 additions & 37 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/repository"
"github.com/instill-ai/artifact-backend/pkg/service"
"github.com/instill-ai/artifact-backend/pkg/utils"
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
)

Expand Down Expand Up @@ -347,50 +348,79 @@ func (ph *PublicHandler) DeleteCatalog(ctx context.Context, req *artifactpb.Dele
return nil, fmt.Errorf(ErrorDeleteKnowledgeBaseMsg, customerror.ErrNoPermission)
}

// delete acl
err = ph.service.ACLClient.Purge(ctx, "knowledgebase", kb.UID)
if err != nil {
log.Error("failed to purge catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorDeleteKnowledgeBaseMsg, err)
}
startSignal := make(chan bool)
// TODO: in the future, we should delete the catalog using clean up worker
go utils.GoRecover(func() {
ctx := context.TODO()
log, _ := logger.GetZapLogger(ctx)
// wait for the catalog to be deleted in postgres
canStart := <-startSignal
if !canStart {
log.Error("failed to delete catalog in background", zap.String("catalog_id", kb.UID.String()))
return
}
log.Info("DeleteCatalog starts in background", zap.String("catalog_id", kb.UID.String()))
allPass := true
// delete files in minIO
err = <-ph.service.MinIO.DeleteKnowledgeBase(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete files in minIO in background", zap.Error(err))
allPass = false
}

// delete files in minIO
err = <-ph.service.MinIO.DeleteKnowledgeBase(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete files in minIO", zap.Error(err))
}
// delete database in postgres
err = ph.service.Repository.DeleteAllKnowledgeBaseFiles(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete files in postgres", zap.Error(err))
}
// delete converted files in postgres
err = ph.service.Repository.DeleteAllConvertedFilesinKb(ctx, kb.UID)
if err != nil {
log.Error("failed to delete converted files in postgres", zap.Error(err))
}
// delete all chunks in postgres
err = ph.service.Repository.HardDeleteChunksByKbUID(ctx, kb.UID)
if err != nil {
log.Error("failed to delete chunks in postgres", zap.Error(err))
}
// delete the collection in milvus
err = ph.service.MilvusClient.DropKnowledgeBaseCollection(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete collection in milvus in background", zap.Error(err))
allPass = false
}

// delete all embedding in postgres
err = ph.service.Repository.HardDeleteEmbeddingsByKbUID(ctx, kb.UID)
if err != nil {
log.Error("failed to delete embeddings in postgres", zap.Error(err))
}
// delete all files in postgres
err = ph.service.Repository.DeleteAllKnowledgeBaseFiles(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete files in postgres in background", zap.Error(err))
allPass = false
}
// delete converted files in postgres
err = ph.service.Repository.DeleteAllConvertedFilesInKb(ctx, kb.UID)
if err != nil {
log.Error("failed to delete converted files in postgres in background", zap.Error(err))
allPass = false
}
// delete all chunks in postgres
err = ph.service.Repository.HardDeleteChunksByKbUID(ctx, kb.UID)
if err != nil {
log.Error("failed to delete chunks in postgres in background", zap.Error(err))
allPass = false
}

// delete all embedding in postgres
err = ph.service.Repository.HardDeleteEmbeddingsByKbUID(ctx, kb.UID)
if err != nil {
log.Error("failed to delete embeddings in postgres in background", zap.Error(err))
allPass = false
}
// delete acl. Note: we need to delete the acl after deleting the catalog
err = ph.service.ACLClient.Purge(ctx, "knowledgebase", kb.UID)
if err != nil {
log.Error("failed to purge catalog", zap.Error(err))
allPass = false
}
if allPass {
log.Info("successfully deleted catalog in background", zap.String("catalog_id", kb.UID.String()))
} else {
log.Error("failed to delete catalog in background", zap.String("catalog_id", kb.UID.String()))
}
}, "DeleteCatalog")

deletedKb, err := ph.service.Repository.DeleteKnowledgeBase(ctx, ns.NsUID.String(), req.CatalogId)
if err != nil {
log.Error("failed to delete catalog", zap.Error(err))
startSignal <- false
return nil, err
}

// delete acl. Note: we need to delete the acl after deleting the catalog
err = ph.service.ACLClient.Purge(ctx, "knowledgebase", kb.UID)
if err != nil {
log.Error("failed to purge catalog", zap.Error(err))
}
// start the background deletion
startSignal <- true

return &artifactpb.DeleteCatalogResponse{
Catalog: &artifactpb.Catalog{
Expand Down
119 changes: 81 additions & 38 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"
)

func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.UploadCatalogFileRequest) (*artifactpb.UploadCatalogFileResponse, error) {
Expand Down Expand Up @@ -135,8 +136,9 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
return nil, err
}

// increase catalog usage
err = ph.service.Repository.IncreaseKnowledgeBaseUsage(ctx, kb.UID.String(), int(fileSize))
// increase catalog usage. need to increase after the file is created.
// Note: in the future, we need to increase the usage in transaction with creating the file.
err = ph.service.Repository.IncreaseKnowledgeBaseUsage(ctx, nil, kb.UID.String(), int(fileSize))
if err != nil {
log.Error("failed to increase catalog usage", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -318,14 +320,16 @@ func (ph *PublicHandler) DeleteCatalogFile(

// ACL - check user's permission to write catalog of kb file
kbfs, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{uuid.FromStringOrNil(req.FileUid)})
if err != nil && len(kbfs) == 0 {
log.Error("failed to get catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorListKnowledgeBasesMsg, err)
if err != nil {
log.Error("failed to get catalog files", zap.Error(err))
return nil, fmt.Errorf("failed to get catalog files. err: %w", err)
} else if len(kbfs) == 0 {
return nil, fmt.Errorf("file not found. err: %w", customerror.ErrNotFound)
}
granted, err := ph.service.ACLClient.CheckPermission(ctx, "knowledgebase", kbfs[0].KnowledgeBaseUID, "writer")
if err != nil {
log.Error("failed to check permission", zap.Error(err))
return nil, fmt.Errorf(ErrorUpdateKnowledgeBaseMsg, err)
return nil, fmt.Errorf("failed to check permission. err: %w", err)
}
if !granted {
log.Error("no permission to delete catalog file")
Expand All @@ -345,66 +349,106 @@ func (ph *PublicHandler) DeleteCatalogFile(
files, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{fUID})
if err != nil {
return nil, err
}
if len(files) == 0 {
} else if len(files) == 0 {
return nil, fmt.Errorf("file not found. err: %w", customerror.ErrNotFound)
}

startSignal := make(chan bool)
// TODO: need to use clean worker in the future
go utils.GoRecover(
func() {
// to prevent parent context from being cancelled, create a new context
// Create a new context to prevent the parent context from being cancelled
ctx := context.TODO()
// delete the file from minio
log, _ := logger.GetZapLogger(ctx)
canStart := <-startSignal
if !canStart {
log.Info("DeleteCatalogFile: received stop signal")
return
}
log.Info("DeleteCatalogFile: start deleting file from minio, database and milvus")
allPass := true
// Delete the file from MinIO
objectPaths := []string{}
// kb file in minio
// Add the knowledge base file in MinIO to the list of objects to delete
objectPaths = append(objectPaths, files[0].Destination)
// converted file in minio
// Add the converted file in MinIO to the list of objects to delete
cf, err := ph.service.Repository.GetConvertedFileByFileUID(ctx, fUID)
if err == nil {
if err != nil {
if err != gorm.ErrRecordNotFound {
log.Error("failed to get converted file by file uid", zap.Error(err))
allPass = false
}
} else if cf != nil {
objectPaths = append(objectPaths, cf.Destination)
}
// chunks in minio
chunks, _ := ph.service.Repository.ListChunksByKbFileUID(ctx, fUID)
if len(chunks) > 0 {
// Add the chunks in MinIO to the list of objects to delete
chunks, err := ph.service.Repository.ListChunksByKbFileUID(ctx, fUID)
if err != nil {
log.Error("failed to get chunks by kb file uid", zap.Error(err))
allPass = false
} else if len(chunks) > 0 {
for _, chunk := range chunks {
objectPaths = append(objectPaths, chunk.ContentDest)
}
}
// delete the embeddings in milvus(need to delete first)
// Delete the embeddings in Milvus (this better to be done first)
embUIDs := []string{}
embeddings, _ := ph.service.Repository.ListEmbeddingsByKbFileUID(ctx, fUID)
for _, emb := range embeddings {
embUIDs = append(embUIDs, emb.UID.String())
}
_ = ph.service.MilvusClient.DeleteEmbeddingsInKb(ctx, files[0].KnowledgeBaseUID.String(), embUIDs)

_ = ph.service.MinIO.DeleteFiles(ctx, objectPaths)
// delete the converted file in postgreSQL
_ = ph.service.Repository.HardDeleteConvertedFileByFileUID(ctx, fUID)
// delete the chunks in postgreSQL
_ = ph.service.Repository.HardDeleteChunksByKbFileUID(ctx, fUID)
// delete the embeddings in postgreSQL
_ = ph.service.Repository.HardDeleteEmbeddingsByKbFileUID(ctx, fUID)
// print success message and file uid
log.Info("Successfully deleted file from minio, database and milvus", zap.String("file_uid", fUID.String()))
err = ph.service.MilvusClient.DeleteEmbeddingsInKb(ctx, files[0].KnowledgeBaseUID.String(), embUIDs)
if err != nil {
log.Error("failed to delete embeddings in milvus", zap.Error(err))
allPass = false
}

// Delete the files in MinIO
errChan := ph.service.MinIO.DeleteFiles(ctx, objectPaths)
for err := range errChan {
if err != nil {
log.Error("failed to delete files in minio", zap.Error(err))
allPass = false
}
}
// Delete the converted file in PostgreSQL
err = ph.service.Repository.HardDeleteConvertedFileByFileUID(ctx, fUID)
if err != nil {
log.Error("failed to delete converted file in postgreSQL", zap.Error(err))
allPass = false
}
// Delete the chunks in PostgreSQL
err = ph.service.Repository.HardDeleteChunksByKbFileUID(ctx, fUID)
if err != nil {
log.Error("failed to delete chunks in postgreSQL", zap.Error(err))
allPass = false
}
// Delete the embeddings in PostgreSQL
err = ph.service.Repository.HardDeleteEmbeddingsByKbFileUID(ctx, fUID)
if err != nil {
log.Error("failed to delete embeddings in postgreSQL", zap.Error(err))
allPass = false
}
if allPass {
log.Info("DeleteCatalogFile: successfully deleted file from minio, database and milvus", zap.String("file_uid", fUID.String()))
} else {
log.Error("DeleteCatalogFile: failed to delete file from minio, database and milvus", zap.String("file_uid", fUID.String()))
}
},
"DeleteCatalogFile",
)

// delete the file in postgreSQL
err = ph.service.Repository.DeleteKnowledgeBaseFile(ctx, req.FileUid)
if err != nil {
return nil, err
}
// decrease catalog usage
err = ph.service.Repository.IncreaseKnowledgeBaseUsage(ctx, files[0].KnowledgeBaseUID.String(), int(-files[0].Size))
err = ph.service.Repository.DeleteKnowledgeBaseFileAndDecreaseUsage(ctx, fUID)
if err != nil {
log.Error("failed to delete knowledge base file and decrease usage", zap.Error(err))
startSignal <- false
return nil, err
}
// start the background deletion
startSignal <- true

return &artifactpb.DeleteCatalogFileResponse{
FileUid: req.FileUid,
FileUid: fUID.String(),
}, nil

}
Expand All @@ -425,8 +469,7 @@ func (ph *PublicHandler) ProcessCatalogFiles(ctx context.Context, req *artifactp
kbfs, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, fileUUIDs)
if err != nil {
return nil, err
}
if len(kbfs) == 0 {
} else if len(kbfs) == 0 {
return nil, fmt.Errorf("file not found. err: %w", customerror.ErrNotFound)
}
// check write permission for the catalog
Expand Down
Loading

0 comments on commit 0bbf3f2

Please sign in to comment.