Skip to content

Commit

Permalink
feat: collect shared logic for blob storage and minio
Browse files Browse the repository at this point in the history
  • Loading branch information
chuang8511 committed Nov 14, 2024
1 parent 3b853d0 commit ad59e80
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
58 changes: 58 additions & 0 deletions blobstorage/blobstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package blobstorage

import (
"bytes"
"context"
"fmt"
"io"
"net/http"

"google.golang.org/grpc/metadata"
)

func UploadFile(ctx context.Context, uploadURL string, data []byte, contentType string) error {

req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadURL, nil)

if err != nil {
return fmt.Errorf("creating request: %w", err)
}

body := bytes.NewReader(data)
contentLength := int64(len(data))
req.Body = io.NopCloser(body)
req.Header = metadataToHTTPHeaders(ctx)

req.ContentLength = contentLength
req.Header.Set("Content-Type", contentType)
req.Header.Del("Authorization")

client := &http.Client{}
resp, err := client.Do(req)

if err != nil {
return fmt.Errorf("uploading blob: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
}

return nil
}

func metadataToHTTPHeaders(ctx context.Context) http.Header {
headers := http.Header{}
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
return headers
}
for key, values := range md {
for _, value := range values {
headers.Add(key, value)
}
}
return headers
}
2 changes: 2 additions & 0 deletions blobstorage/blobstorage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// TODO: add test for blobstorage
package blobstorage
50 changes: 50 additions & 0 deletions minio/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
Expand Down Expand Up @@ -277,3 +278,52 @@ func GenerateOutputRefID(prefix string) string {
referenceUID, _ := uuid.NewV4()
return prefix + "/output/" + referenceUID.String()
}

// It's used for the operations that don't need to initialize a bucket
// We will refactor the Minio shared logic in the future
type minioClientWrapper struct {
client *miniogo.Client
}

// NewMinioClient returns a new minio client
func NewMinioClient(ctx context.Context, cfg *Config, logger *zap.Logger) (*minioClientWrapper, error) {
logger.Info("Initializing Minio client")

endpoint := net.JoinHostPort(cfg.Host, cfg.Port)
client, err := miniogo.New(endpoint, &miniogo.Options{
Creds: credentials.NewStaticV4(cfg.RootUser, cfg.RootPwd, ""),
Secure: cfg.Secure,
})
if err != nil {
logger.Error("cannot connect to minio",
zap.String("host:port", cfg.Host+":"+cfg.Port),
zap.String("user", cfg.RootUser),
zap.String("pwd", cfg.RootPwd), zap.Error(err))
return nil, err
}
return &minioClientWrapper{client: client}, nil
}

// GetFile fetches a file from minio
func (m *minioClientWrapper) GetFile(ctx context.Context, bucketName, objectPath string) (data []byte, contentType string, err error) {
object, err := m.client.GetObject(ctx, bucketName, objectPath, miniogo.GetObjectOptions{})

if err != nil {
return nil, "", fmt.Errorf("get object: %w", err)
}

defer object.Close()

info, err := object.Stat()

if err != nil {
return nil, "", fmt.Errorf("get object info: %w", err)
}
data, err = io.ReadAll(object)

if err != nil {
return nil, "", fmt.Errorf("read object: %w", err)
}

return data, info.ContentType, nil
}

0 comments on commit ad59e80

Please sign in to comment.