Skip to content

Commit

Permalink
implement ContentServerDataStore for azure blob storage too
Browse files Browse the repository at this point in the history
  • Loading branch information
xelat09 committed Jan 5, 2025
1 parent e20b174 commit 2c0e5df
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
68 changes: 68 additions & 0 deletions pkg/azurestore/azureservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"errors"
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand Down Expand Up @@ -67,6 +69,8 @@ type AzBlob interface {
Upload(ctx context.Context, body io.ReadSeeker) error
// Download returns a readcloser to download the contents of the blob
Download(ctx context.Context) (io.ReadCloser, error)
// Serves the contents of the blob directly handling HTTP Range requests if set
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
// Get the offset of the blob and its indexes
GetOffset(ctx context.Context) (int64, error)
// Commit the uploaded blocks to the BlockBlob
Expand Down Expand Up @@ -187,6 +191,31 @@ func (blockBlob *BlockBlob) Download(ctx context.Context) (io.ReadCloser, error)
return resp.Body, nil
}

// Serve content respecting range header
func (blockBlob *BlockBlob) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
var downloadOptions, err = parseHTTPRange(r)
if err != nil {
return err
}
resp, err := blockBlob.BlobClient.DownloadStream(ctx, downloadOptions)
if err != nil {
return err
}

statusCode := http.StatusOK
if resp.ContentRange != nil {
// Use 206 Partial Content for range requests
statusCode = http.StatusPartialContent
} else if resp.ContentLength != nil && *resp.ContentLength == 0 {
statusCode = http.StatusNoContent
}
w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.Body)
resp.Body.Close()
return err
}

func (blockBlob *BlockBlob) GetOffset(ctx context.Context) (int64, error) {
// Get the offset of the file from azure storage
// For the blob, show each block (ID and size) that is a committed part of it.
Expand Down Expand Up @@ -253,6 +282,11 @@ func (infoBlob *InfoBlob) Download(ctx context.Context) (io.ReadCloser, error) {
return resp.Body, nil
}

// ServeContent is not needed for infoBlob
func (infoBlob *InfoBlob) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
return nil
}

// infoBlob does not utilise offset, so just return 0, nil
func (infoBlob *InfoBlob) GetOffset(ctx context.Context) (int64, error) {
return 0, nil
Expand Down Expand Up @@ -309,3 +343,37 @@ func checkForNotFoundError(err error) error {
}
return err
}

// simple parse http ranging, no multipart ranges/no if-range/no last-modified, not supported by azure anyway
func parseHTTPRange(r *http.Request) (*azblob.DownloadStreamOptions, error) {
rangeHeader := r.Header.Get("Range")
if rangeHeader == "" {
// this is totally fine, Range header is not required
return nil, nil
}

const prefix = "bytes="
if !strings.HasPrefix(rangeHeader, prefix) {
return nil, fmt.Errorf("invalid Range header format")
}

rangeParts := strings.Split(strings.TrimPrefix(rangeHeader, prefix), "-")
if len(rangeParts) != 2 {
return nil, fmt.Errorf("invalid Range header format")
}

offset, err := strconv.ParseInt(rangeParts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid offset in Range header")
}

count, err := strconv.ParseInt(rangeParts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid count in Range header")
}

downloadOptions := azblob.DownloadStreamOptions{}
downloadOptions.Range.Offset = offset
downloadOptions.Range.Count = count - offset + 1
return &downloadOptions, nil
}
10 changes: 10 additions & 0 deletions pkg/azurestore/azurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"os"
"strings"

Expand Down Expand Up @@ -47,6 +48,7 @@ func (store AzureStore) UseIn(composer *handler.StoreComposer) {
composer.UseCore(store)
composer.UseTerminater(store)
composer.UseLengthDeferrer(store)
composer.UseContentServer(store)
}

func (store AzureStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
Expand Down Expand Up @@ -149,6 +151,10 @@ func (store AzureStore) AsLengthDeclarableUpload(upload handler.Upload) handler.
return upload.(*AzUpload)
}

func (store AzureStore) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*AzUpload)
}

func (upload *AzUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
// Create a temporary file for holding the uploaded data
file, err := os.CreateTemp(upload.tempDir, "tusd-az-tmp-")
Expand Down Expand Up @@ -214,6 +220,10 @@ func (upload *AzUpload) GetReader(ctx context.Context) (io.ReadCloser, error) {
return upload.BlockBlob.Download(ctx)
}

func (upload *AzUpload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
return upload.BlockBlob.ServeContent(ctx, w, r)
}

// Finish the file upload and commit the block list
func (upload *AzUpload) FinishUpload(ctx context.Context) error {
return upload.BlockBlob.Commit(ctx)
Expand Down
14 changes: 14 additions & 0 deletions pkg/azurestore/azurestore_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2c0e5df

Please sign in to comment.