Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(catalog): when delete catalog and file, also delete the artifact #61

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,16 +351,48 @@ func (ph *PublicHandler) DeleteCatalog(ctx context.Context, req *artifactpb.Dele
return nil, fmt.Errorf(ErrorDeleteKnowledgeBaseMsg, customerror.ErrNoPermission)
}

deletedKb, err := ph.service.Repository.DeleteKnowledgeBase(ctx, ns.NsUID.String(), req.CatalogId)
if err != nil {
return nil, err
}
err = ph.service.ACLClient.Purge(ctx, "knowledgebase", deletedKb.UID)
err = ph.service.ACLClient.Purge(ctx, "knowledgebase", kb.UID)
if err != nil {
log.Error("failed to purge catalog", zap.Error(err))
return nil, fmt.Errorf(ErrorDeleteKnowledgeBaseMsg, err)
}

// delete collection milvus
err = ph.service.MilvusClient.DropKnowledgeBaseCollection(ctx, kb.UID.String())
if err != nil {
log.Error("failed to drop collection in milvus", zap.Error(err))
}
// delete files in minIO
err = <-ph.service.MinIO.DeleteKnowledgeBase(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete files in minIO", zap.Error(err))
}
// delete database in postgres
err = ph.service.Repository.DeleteAllKnowledgeBaseFiles(ctx, kb.UID.String())
if err != nil {
log.Error("failed to delete files in postgres", zap.Error(err))
}
// delete converted files in postgres
err = ph.service.Repository.DeleteAllConvertedFilesinKb(ctx, kb.UID)
if err != nil {
log.Error("failed to delete converted files in postgres", zap.Error(err))
}
// delete all chunks in postgres
err = ph.service.Repository.HardDeleteChunksByKbUID(ctx, kb.UID)
if err != nil {
log.Error("failed to delete chunks in postgres", zap.Error(err))
}

// delete all embedding in postgres
err = ph.service.Repository.HardDeleteEmbeddingsByKbUID(ctx, kb.UID)
if err != nil {
log.Error("failed to delete embeddings in postgres", zap.Error(err))
}

deletedKb, err := ph.service.Repository.DeleteKnowledgeBase(ctx, ns.NsUID.String(), req.CatalogId)
if err != nil {
return nil, err
}
return &artifactpb.DeleteCatalogResponse{
Catalog: &artifactpb.Catalog{
Name: deletedKb.Name,
Expand Down
33 changes: 32 additions & 1 deletion pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,42 @@ func (ph *PublicHandler) DeleteCatalogFile(
return nil, fmt.Errorf("file not found. err: %w", customerror.ErrNotFound)
}

// delete the file from minio
objectPaths := []string{}
// kb file in minio
objectPaths = append(objectPaths, files[0].Destination)
// converted file in minio
cf, err := ph.service.Repository.GetConvertedFileByFileUID(ctx, fuid)
if err == nil {
objectPaths = append(objectPaths, cf.Destination)
}
// chunks in minio
chunks, _ := ph.service.Repository.ListChunksByKbFileUID(ctx, fuid)
if len(chunks) > 0 {
for _, chunk := range chunks {
objectPaths = append(objectPaths, chunk.ContentDest)
}
}
// delete the embeddings in milvus(need to delete first)
embUIDs := []string{}
embs, _ := ph.service.Repository.ListEmbeddingsByKbFileUID(ctx, fuid)
for _, emb := range embs {
embUIDs = append(embUIDs, emb.UID.String())
}
_ = ph.service.MilvusClient.DeleteEmbeddingsInKb(ctx, files[0].KnowledgeBaseUID.String(), embUIDs)

_ = ph.service.MinIO.DeleteFiles(ctx, objectPaths)
// delete the converted file in postgres
_ = ph.service.Repository.HardDeleteConvertedFileByFileUID(ctx, fuid)
// delete the chunks in postgres
_ = ph.service.Repository.HardDeleteChunksByKbFileUID(ctx, fuid)
// delete the embeddings in postgres
_ = ph.service.Repository.HardDeleteEmbeddingsByKbFileUID(ctx, fuid)
// delete the file in postgres
err = ph.service.Repository.DeleteKnowledgeBaseFile(ctx, req.FileUid)
if err != nil {
return nil, err
}

// decrease catalog usage
err = ph.service.Repository.IncreaseKnowledgeBaseUsage(ctx, files[0].KnowledgeBaseUID.String(), int(-files[0].Size))
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/milvus/milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ type MilvusClientI interface {
InsertVectorsToKnowledgeBaseCollection(ctx context.Context, kbUID string, embeddings []Embedding) error
GetAllCollectionNames(ctx context.Context) ([]*entity.Collection, error)
DeleteCollection(ctx context.Context, collectionName string) error
// drop knowledge base collection
DropKnowledgeBaseCollection(ctx context.Context, kbUID string) error
ListEmbeddings(ctx context.Context, collectionName string) ([]Embedding, error)
SearchSimilarEmbeddings(ctx context.Context, collectionName string, vectors [][]float32, topK int) ([][]SimilarEmbedding, error)
SearchSimilarEmbeddingsInKB(ctx context.Context, kbUID string, vectors [][]float32, topK int) ([][]SimilarEmbedding, error)
DeleteEmbedding(ctx context.Context, collectionName string, embeddingUID []string) error
DeleteEmbeddingsInKb(ctx context.Context, kbUID string, embeddingUID []string) error
// GetKnowledgeBaseCollectionName returns the collection name for a knowledge base
GetKnowledgeBaseCollectionName(kbUID string) string
Close()
Expand Down Expand Up @@ -304,6 +307,12 @@ func (m *MilvusClient) DeleteEmbedding(ctx context.Context, collectionName strin
return err
}

// DeleteEmbeddingsInKb
func (m *MilvusClient) DeleteEmbeddingsInKb(ctx context.Context, kbUID string, embeddingUID []string) error {
collectionName := m.GetKnowledgeBaseCollectionName(kbUID)
return m.DeleteEmbedding(ctx, collectionName, embeddingUID)
}

type SimilarEmbedding struct {
Embedding
Score float32
Expand Down Expand Up @@ -424,3 +433,9 @@ func (m *MilvusClient) SearchSimilarEmbeddingsInKB(ctx context.Context, kbUID st
collectionName := m.GetKnowledgeBaseCollectionName(kbUID)
return m.SearchSimilarEmbeddings(ctx, collectionName, vectors, topK)
}

// Drop KnowledgeBaseCollection
func (m *MilvusClient) DropKnowledgeBaseCollection(ctx context.Context, kbUID string) error {
collectionName := m.GetKnowledgeBaseCollectionName(kbUID)
return m.DeleteCollection(ctx, collectionName)
}
4 changes: 2 additions & 2 deletions pkg/milvus/milvus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ package milvus
// t.Fatalf("Failed to get Milvus version: %v", err)
// }
// fmt.Printf("Successfully connected to Milvus! Health: %v, Version: %v\n", h, v)
// err = mc.DeleteEmbedding(context.TODO(), "kb_gary_test_kb_1", []string{"5a51f2d5-9587-4472-9be2-67fd200145f3", "uid2"})
// err = mc.DeleteEmbedding(context.TODO(), "kb_73f6b9aa_0399_4f16_bca7_6b8b75fede32", []string{"aa4631e3-f936-48e4-bc2e-06c4eb629376"})
// if err != nil {
// t.Fatalf("Failed to delete embeddings: %v", err)
// }
Expand Down Expand Up @@ -240,7 +240,7 @@ package milvus
// dummyVector[0] = 1
// topK := 2
// batchVector := [][]float32{dummyVector}
// embeddings, err := mc.SearchEmbeddings(context.TODO(), collectionName, batchVector, topK)
// embeddings, err := mc.SearchSimilarEmbeddings(context.TODO(), collectionName, batchVector, topK)
// if err != nil {
// t.Fatalf("Failed to search embeddings: %v", err)
// }
Expand Down
52 changes: 49 additions & 3 deletions pkg/minio/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,21 @@ type KnowledgeBaseI interface {
GetConvertedFilePathInKnowledgeBase(kbUID, ConvertedFileUID, fileExt string) string
// GetChunkPathInKnowledgeBase returns the path of the chunk in MinIO.
GetChunkPathInKnowledgeBase(kbUID, chunkUID string) string
// DeleteKnowledgeBase deletes all files in the knowledge base.
DeleteKnowledgeBase(ctx context.Context, kbUID string) chan error
// DeleteAllConvertedFilesInKb deletes converted files in the knowledge base.
DeleteAllConvertedFilesInKb(ctx context.Context, kbUID string) chan error
// DeleteAllUploadedFilesInKb deletes uploaded files in the knowledge base.
DeleteAllUploadedFilesInKb(ctx context.Context, kbUID string) chan error
// DeleteAllChunksInKb deletes chunks in the knowledge base.
DeleteAllChunksInKb(ctx context.Context, kbUID string) chan error
}

// prefix
const uploadedFilePrefix = "/uploaded-file/"
const convertedFilePrefix = "/converted-file/"
const chunkPrefix = "/chunk/"

// SaveConvertedFile saves a converted file to MinIO with the appropriate MIME type.
func (m *Minio) SaveConvertedFile(ctx context.Context, kbUID, convertedFileUID, fileExt string, content []byte) error {
filePathName := m.GetConvertedFilePathInKnowledgeBase(kbUID, convertedFileUID, fileExt)
Expand Down Expand Up @@ -68,14 +81,47 @@ func (m *Minio) SaveChunks(ctx context.Context, kbUID string, chunks map[ChunkUI
return nil
}

// Delete all files in the knowledge base
func (m *Minio) DeleteKnowledgeBase(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID)
return err
}

// Delete converted files in the knowledge base
func (m *Minio) DeleteAllConvertedFilesInKb(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID+convertedFilePrefix)

return err
}

// Delete uploaded files in the knowledge base
func (m *Minio) DeleteAllUploadedFilesInKb(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID+uploadedFilePrefix)

return err
}

// Delete chunks in the knowledge base
func (m *Minio) DeleteAllChunksInKb(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID+chunkPrefix)

return err
}



func (m *Minio) GetUploadedFilePathInKnowledgeBase(kbUID, dest string) string {
return kbUID + "/uploaded-file/" + dest
return kbUID + uploadedFilePrefix + dest
}

func (m *Minio) GetConvertedFilePathInKnowledgeBase(kbUID, ConvertedFileUID, fileExt string) string {
return kbUID + "/converted-file/" + ConvertedFileUID + "." + fileExt
return kbUID + convertedFilePrefix + ConvertedFileUID + "." + fileExt
}

func (m *Minio) GetChunkPathInKnowledgeBase(kbUID, chunkUID string) string {
return kbUID + "/chunk/" + chunkUID + ".txt"
return kbUID + chunkPrefix + chunkUID + ".txt"
}
75 changes: 75 additions & 0 deletions pkg/minio/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type MinioI interface {
UploadBase64File(ctx context.Context, filePath string, base64Content string, fileMimeType string) (err error)
// deleteFile
DeleteFile(ctx context.Context, filePath string) (err error)
// deleteFiles
DeleteFiles(ctx context.Context, filePaths []string) chan error
// deleteFilesWithPrefix
DeleteFilesWithPrefix(ctx context.Context, prefix string) chan error
// GetFile
GetFile(ctx context.Context, filePath string) ([]byte, error)
// GetFilesByPaths
Expand Down Expand Up @@ -110,6 +114,32 @@ func (m *Minio) DeleteFile(ctx context.Context, filePathName string) (err error)
return nil
}

// delete bunch of files from minio
func (m *Minio) DeleteFiles(ctx context.Context, filePathNames []string) chan error {
errCh := make(chan error, len(filePathNames))
log, err := log.GetZapLogger(ctx)
if err != nil {
errCh <- err
return errCh
}
// Delete the files from MinIO parallelly
var wg sync.WaitGroup
for _, filePathName := range filePathNames {
wg.Add(1)
go func(filePathName string, errCh chan error) {
defer wg.Done()
err := m.client.RemoveObject(m.bucket, filePathName)
if err != nil {
log.Error("Failed to delete file from MinIO", zap.Error(err))
errCh <- err
return
}
}(filePathName, errCh)
}
wg.Wait()
return errCh
}

func (m *Minio) GetFile(ctx context.Context, filePathName string) ([]byte, error) {
log, err := log.GetZapLogger(ctx)
if err != nil {
Expand Down Expand Up @@ -151,6 +181,7 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File
var wg sync.WaitGroup
files := make([]FileContent, len(filePaths))
errors := make([]error, len(filePaths))
var mu sync.Mutex

for i, path := range filePaths {
wg.Add(1)
Expand All @@ -160,7 +191,9 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File
obj, err := m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{})
if err != nil {
log.Error("Failed to get object from MinIO", zap.String("path", filePath), zap.Error(err))
mu.Lock()
errors[index] = err
mu.Unlock()
return
}
defer obj.Close()
Expand Down Expand Up @@ -191,3 +224,45 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File

return files, nil
}

// delete all files with the same prefix from MinIO
func (m *Minio) DeleteFilesWithPrefix(ctx context.Context, prefix string) chan error {
errCh := make(chan error)
log, err := log.GetZapLogger(ctx)
if err != nil {
errCh <- err
close(errCh)
return errCh
}

// List all objects with the given prefix
objectCh := m.client.ListObjects(m.bucket, prefix, true, nil)

// Use a WaitGroup to wait for all deletions to complete
var wg sync.WaitGroup
for object := range objectCh {
if object.Err != nil {
log.Error("Failed to list object from MinIO", zap.Error(object.Err))
errCh <- object.Err
continue
}

wg.Add(1)
go func(objectName string) {
defer wg.Done()
err := m.client.RemoveObject(m.bucket, objectName)
if err != nil {
log.Error("Failed to delete object from MinIO", zap.String("object", objectName), zap.Error(err))
errCh <- err
}
}(object.Key)
}

// Wait for all deletions to complete
go func() {
wg.Wait()
close(errCh)
}()

return errCh
}
Loading
Loading