diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go index 0f90af8..1e8db59 100644 --- a/pkg/acl/acl.go +++ b/pkg/acl/acl.go @@ -369,7 +369,7 @@ func (c *ACLClient) ListPermissions(ctx context.Context, objectType string, role Relation: role, Type: objectType, }) - // TODO: handle error when no model is created + // TODO: handle error when no auth model is created if err != nil { if statusErr, ok := status.FromError(err); ok { if statusErr.Code() == codes.Code(openfga.ErrorCode_type_not_found) { diff --git a/pkg/handler/knowledgebasefiles.go b/pkg/handler/knowledgebasefiles.go index 4d95e68..f9759bd 100644 --- a/pkg/handler/knowledgebasefiles.go +++ b/pkg/handler/knowledgebasefiles.go @@ -6,16 +6,18 @@ import ( "strings" "github.com/gofrs/uuid" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + "gorm.io/gorm" + "github.com/instill-ai/artifact-backend/pkg/constant" "github.com/instill-ai/artifact-backend/pkg/customerror" - "github.com/instill-ai/artifact-backend/pkg/logger" // Add this import + "github.com/instill-ai/artifact-backend/pkg/logger" "github.com/instill-ai/artifact-backend/pkg/repository" "github.com/instill-ai/artifact-backend/pkg/resource" "github.com/instill-ai/artifact-backend/pkg/utils" + artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" - "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" - "gorm.io/gorm" ) func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.UploadCatalogFileRequest) (*artifactpb.UploadCatalogFileResponse, error) { @@ -29,7 +31,11 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb. if err != nil { return nil, err } - + // check file name length based on character count + if len(req.File.Name) > 255 { + return nil, fmt.Errorf("file name is too long. max length is 255. name: %s err: %w", + req.File.Name, customerror.ErrInvalidArgument) + } // determine the file type by its extension req.File.Type = DetermineFileType(req.File.Name) if req.File.Type == artifactpb.FileType_FILE_TYPE_UNSPECIFIED { diff --git a/pkg/handler/knowledgebasefiles_test.go b/pkg/handler/knowledgebasefiles_test.go new file mode 100644 index 0000000..14c49f2 --- /dev/null +++ b/pkg/handler/knowledgebasefiles_test.go @@ -0,0 +1,20 @@ +package handler + +import ( + "fmt" + "testing" + "unicode/utf8" +) + +func TestUploadCatalogFile(t *testing.T) { + // Check rune count + input := "-" + + actual := utf8.RuneCountInString(input) + // print actual + fmt.Println(actual) + + // check string length + expected := len(input) + fmt.Println(expected) +} diff --git a/pkg/milvus/milvus.go b/pkg/milvus/milvus.go index 9ad5f45..5802fe6 100644 --- a/pkg/milvus/milvus.go +++ b/pkg/milvus/milvus.go @@ -144,14 +144,17 @@ func (m *MilvusClient) CreateKnowledgeBaseCollection(ctx context.Context, kbUID // InsertVectorsToKnowledgeBaseCollection func (m *MilvusClient) InsertVectorsToKnowledgeBaseCollection(ctx context.Context, kbUID string, embeddings []Embedding) error { + logger, _ := logger.GetZapLogger(ctx) collectionName := m.GetKnowledgeBaseCollectionName(kbUID) // Check if the collection exists has, err := m.c.HasCollection(ctx, collectionName) if err != nil { + logger.Error("Failed to check collection existence", zap.Error(err)) return fmt.Errorf("failed to check collection existence: %w", err) } if !has { + logger.Error("Collection does not exist", zap.String("collection", collectionName)) return fmt.Errorf("collection %s does not exist", collectionName) } @@ -180,18 +183,36 @@ func (m *MilvusClient) InsertVectorsToKnowledgeBaseCollection(ctx context.Contex entity.NewColumnFloatVector(KbCollectionFiledEmbedding, VectorDim, vectors), } - // Insert the data - _, err = m.c.Upsert(ctx, collectionName, "", columns...) + // Insert the data with retry + maxRetries := 3 + for attempt := 1; attempt <= maxRetries; attempt++ { + _, err = m.c.Upsert(ctx, collectionName, "", columns...) + if err == nil { + break + } + logger.Warn("Failed to insert vectors, retrying", zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Second * time.Duration(attempt)) + } if err != nil { + logger.Error("Failed to insert vectors after retries", zap.Error(err)) return fmt.Errorf("failed to insert vectors: %w", err) } - // Optionally, you can flush the collection to ensure the data is persisted - err = m.c.Flush(ctx, collectionName, false) + // Flush the collection with retry + for attempt := 1; attempt <= maxRetries; attempt++ { + err = m.c.Flush(ctx, collectionName, false) + if err == nil { + break + } + logger.Warn("Failed to flush collection, retrying", zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Second * time.Duration(attempt)) + } if err != nil { + logger.Error("Failed to flush collection after retries", zap.Error(err)) return fmt.Errorf("failed to flush collection after insertion: %w", err) } + logger.Info("Successfully inserted and flushed vectors", zap.String("collection", collectionName)) return nil } diff --git a/pkg/minio/knowledgebase.go b/pkg/minio/knowledgebase.go index 9689759..d0d9343 100644 --- a/pkg/minio/knowledgebase.go +++ b/pkg/minio/knowledgebase.go @@ -6,7 +6,9 @@ import ( "fmt" "sync" + "github.com/instill-ai/artifact-backend/pkg/logger" "github.com/instill-ai/artifact-backend/pkg/utils" + "go.uber.org/zap" ) // KnowledgeBaseI is the interface for knowledge base related operations. @@ -56,8 +58,13 @@ type ChunkContentType []byte // SaveTextChunks saves batch of chunks(text files) to MinIO. func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[ChunkUIDType]ChunkContentType) error { + logger, _ := logger.GetZapLogger(ctx) var wg sync.WaitGroup - errorUIDChan := make(chan string, len(chunks)) + type ChunkError struct { + ChunkUID string + ErrorMessage string + } + errorUIDChan := make(chan ChunkError, len(chunks)) for chunkUID, chunkContent := range chunks { wg.Add(1) go utils.GoRecover(func() { @@ -67,7 +74,8 @@ func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[Chu err := m.UploadBase64File(ctx, filePathName, base64.StdEncoding.EncodeToString(chunkContent), "text/plain") if err != nil { - errorUIDChan <- string(chunkUID) + logger.Error("Failed to upload chunk after retries", zap.String("chunkUID", string(chunkUID)), zap.Error(err)) + errorUIDChan <- ChunkError{ChunkUID: string(chunkUID), ErrorMessage: err.Error()} return } }(chunkUID, chunkContent) @@ -75,11 +83,12 @@ func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[Chu } wg.Wait() close(errorUIDChan) - var errStr []string + var errStr []ChunkError for err := range errorUIDChan { errStr = append(errStr, err) } if len(errStr) > 0 { + logger.Error("Failed to upload chunks", zap.Any("ChunkError", errStr)) return fmt.Errorf("failed to upload chunks: %v", errStr) } return nil diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index a5a14a6..3cd816e 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" "sync" + "time" "github.com/instill-ai/artifact-backend/config" log "github.com/instill-ai/artifact-backend/pkg/logger" @@ -92,9 +93,16 @@ func (m *Minio) UploadBase64File(ctx context.Context, filePathName string, base6 // Upload the content to MinIO size := int64(len(decodedContent)) // Create the file path with folder structure - _, err = m.client.PutObjectWithContext(ctx, m.bucket, filePathName, contentReader, size, minio.PutObjectOptions{ContentType: fileMimeType}) + for i := 0; i < 3; i++ { + _, err = m.client.PutObjectWithContext(ctx, m.bucket, filePathName, contentReader, size, minio.PutObjectOptions{ContentType: fileMimeType}) + if err == nil { + break + } + log.Error("Failed to upload file to MinIO, retrying...", zap.String("attempt", fmt.Sprintf("%d", i+1)), zap.Error(err)) + time.Sleep(1 * time.Second) + } if err != nil { - log.Error("Failed to upload file to MinIO", zap.Error(err)) + log.Error("Failed to upload file to MinIO after retries", zap.Error(err)) return err } return nil @@ -107,7 +115,14 @@ func (m *Minio) DeleteFile(ctx context.Context, filePathName string) (err error) return err } // Delete the file from MinIO - err = m.client.RemoveObject(m.bucket, filePathName) + for attempt := 1; attempt <= 3; attempt++ { + err = m.client.RemoveObject(m.bucket, filePathName) + if err == nil { + break + } + log.Error("Failed to delete file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Duration(attempt) * time.Second) + } if err != nil { log.Error("Failed to delete file from MinIO", zap.Error(err)) return err @@ -132,7 +147,15 @@ func (m *Minio) DeleteFiles(ctx context.Context, filePathNames []string) chan er func() { func(filePathName string, errCh chan error) { defer wg.Done() - err := m.client.RemoveObject(m.bucket, filePathName) + var err error + for attempt := 1; attempt <= 3; attempt++ { + err = m.client.RemoveObject(m.bucket, filePathName) + if err == nil { + break + } + log.Error("Failed to delete file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Duration(attempt) * time.Second) + } if err != nil { log.Error("Failed to delete file from MinIO", zap.Error(err)) errCh <- err @@ -151,10 +174,18 @@ func (m *Minio) GetFile(ctx context.Context, filePathName string) ([]byte, error return nil, err } - // Get the object using the client - object, err := m.client.GetObject(m.bucket, filePathName, minio.GetObjectOptions{}) + // Get the object using the client with three attempts and proper time delay + var object *minio.Object + for attempt := 1; attempt <= 3; attempt++ { + object, err = m.client.GetObject(m.bucket, filePathName, minio.GetObjectOptions{}) + if err == nil { + break + } + log.Error("Failed to get file from MinIO, retrying...", zap.String("filePathName", filePathName), zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Duration(attempt) * time.Second) + } if err != nil { - log.Error("Failed to get file from MinIO", zap.Error(err)) + log.Error("Failed to get file from MinIO after 3 attempts", zap.String("filePathName", filePathName), zap.Error(err)) return nil, err } defer object.Close() @@ -184,21 +215,27 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File } var wg sync.WaitGroup - files := make([]FileContent, len(filePaths)) - errors := make([]error, len(filePaths)) - var mu sync.Mutex + fileCh := make(chan FileContent, len(filePaths)) + errorCh := make(chan error, len(filePaths)) - for i, path := range filePaths { + for _, path := range filePaths { wg.Add(1) go utils.GoRecover(func() { - func(index int, filePath string) { + func(filePath string) { defer wg.Done() - obj, err := m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{}) + var obj *minio.Object + var err error + for attempt := 1; attempt <= 3; attempt++ { + obj, err = m.client.GetObject(m.bucket, filePath, minio.GetObjectOptions{}) + if err == nil { + break + } + log.Error("Failed to get object from MinIO, retrying...", zap.String("path", filePath), zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Duration(attempt) * time.Second) + } if err != nil { log.Error("Failed to get object from MinIO", zap.String("path", filePath), zap.Error(err)) - mu.Lock() - errors[index] = err - mu.Unlock() + errorCh <- err return } defer obj.Close() @@ -207,22 +244,28 @@ func (m *Minio) GetFilesByPaths(ctx context.Context, filePaths []string) ([]File _, err = io.Copy(&buffer, obj) if err != nil { log.Error("Failed to read object content", zap.String("path", filePath), zap.Error(err)) - errors[index] = err + errorCh <- err return } - - files[index] = FileContent{ + fileCh <- FileContent{ Name: filepath.Base(filePath), Content: buffer.Bytes(), } - }(i, path) + }(path) }, fmt.Sprintf("GetFilesByPaths %s", path)) } wg.Wait() + close(fileCh) + close(errorCh) + + var files []FileContent + for file := range fileCh { + files = append(files, file) + } // Check if any errors occurred - for _, err := range errors { + for err := range errorCh { if err != nil { return nil, err } @@ -257,7 +300,15 @@ func (m *Minio) DeleteFilesWithPrefix(ctx context.Context, prefix string) chan e go utils.GoRecover(func() { func(objectName string) { defer wg.Done() - err := m.client.RemoveObject(m.bucket, objectName) + var err error + for attempt := 1; attempt <= 3; attempt++ { + err = m.client.RemoveObject(m.bucket, objectName) + if err == nil { + break + } + log.Error("Failed to delete object from MinIO, retrying...", zap.String("object", objectName), zap.Int("attempt", attempt), zap.Error(err)) + time.Sleep(time.Duration(attempt) * time.Second) + } if err != nil { log.Error("Failed to delete object from MinIO", zap.String("object", objectName), zap.Error(err)) errCh <- err diff --git a/pkg/service/mgmt.go b/pkg/service/mgmt.go index 44c8aa9..0fe1995 100644 --- a/pkg/service/mgmt.go +++ b/pkg/service/mgmt.go @@ -45,7 +45,6 @@ func (s *Service) GetNamespaceByNsID(ctx context.Context, nsID string) (*resourc return &ns, nil } -// TODO: GetNamespaceTierByNsID: in the future, this logic should be removed in CE. Because CE does not have subscription // GetNamespaceTierByNsID returns the tier of the namespace given the namespace ID func (s *Service) GetNamespaceTierByNsID(ctx context.Context, nsID string) (Tier, error) { ns, err := s.GetNamespaceByNsID(ctx, nsID) @@ -55,7 +54,6 @@ func (s *Service) GetNamespaceTierByNsID(ctx context.Context, nsID string) (Tier return s.GetNamespaceTier(ctx, ns) } -// TODO: GetNamespaceTier: in the future, this logic should be removed in CE. Because CE does not have subscription func (s *Service) GetNamespaceTier(ctx context.Context, ns *resource.Namespace) (Tier, error) { log, _ := logger.GetZapLogger(ctx) switch ns.NsType { diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index e263a84..f1585fa 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -359,11 +359,11 @@ func GetVectorsFromResponse(resp *pipelinePb.TriggerNamespacePipelineReleaseResp for _, output := range resp.Outputs { embedResult, ok := output.GetFields()["embed_result"] if !ok { - return nil, fmt.Errorf("embed_result not found in the output fields. resp: %v", resp) + return nil, fmt.Errorf("embed_result not found in the output fields. output: %v", output) } listValue := embedResult.GetListValue() if listValue == nil { - return nil, fmt.Errorf("embed_result is not a list. resp: %v", resp) + return nil, fmt.Errorf("embed_result is not a list. output: %v", output) } vector := make([]float32, 0, len(listValue.GetValues())) diff --git a/pkg/usage/usage.go b/pkg/usage/usage.go index 8138f45..11a68f5 100644 --- a/pkg/usage/usage.go +++ b/pkg/usage/usage.go @@ -90,7 +90,7 @@ func (u *usage) RetrieveArtifactUsageData() interface{} { // Roll all artifact resources on a user // for _, user := range userResp.GetUsers() { - //TODO: implement the logic to retrieve the artifact usage data + //TODO: implement the logic to retrieve the app usage data // } if userResp.NextPageToken == "" { @@ -115,7 +115,7 @@ func (u *usage) RetrieveArtifactUsageData() interface{} { // Roll all artifact resources on an org // for _, org := range orgResp.GetOrganizations() { - //TODO: implement the logic to retrieve the artifact usage data + //TODO: implement the logic to retrieve the app usage data // } if orgResp.NextPageToken == "" { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 7a4592d..5c6b116 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -663,7 +663,7 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re sourceTable, sourceUID, chunks, _, texts, err := wp.svc.GetChunksByFile(ctx, &file) if err != nil { logger.Error("Failed to get chunks from database first time.", zap.String("SourceUID", sourceUID.String())) - // TODO: investigate minIO failure. Last-Modified time format not recognized. Please report this issue at https://github.com/minio/minio-go/issues. + // TODO: investigate minIO failure. Ref: Last-Modified time format not recognized. Please report this issue at https://github.com/minio/minio-go/issues. // retry once when get chunks failed time.Sleep(1 * time.Second) logger.Info("Retrying to get chunks from database.", zap.String("SourceUID", sourceUID.String()))