Skip to content

Commit

Permalink
implement ContentServerDataStore for azure blob storage too, support …
Browse files Browse the repository at this point in the history
…more headers and Range without rangeEnd
  • Loading branch information
xelat09 committed Jan 12, 2025
1 parent a50bc42 commit 607aafc
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
73 changes: 73 additions & 0 deletions pkg/azurestore/azureservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"sort"
"strings"

Expand Down Expand Up @@ -67,6 +68,8 @@ type AzBlob interface {
Upload(ctx context.Context, body io.ReadSeeker) error
// Download returns a readcloser to download the contents of the blob
Download(ctx context.Context) (io.ReadCloser, error)
// Serves the contents of the blob directly handling special HTTP headers like Range, if set
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
// Get the offset of the blob and its indexes
GetOffset(ctx context.Context) (int64, error)
// Commit the uploaded blocks to the BlockBlob
Expand Down Expand Up @@ -187,6 +190,31 @@ func (blockBlob *BlockBlob) Download(ctx context.Context) (io.ReadCloser, error)
return resp.Body, nil
}

// Serve content respecting range header
func (blockBlob *BlockBlob) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
var downloadOptions, err = parseDownloadOptions(r)
if err != nil {
return err
}
resp, err := blockBlob.BlobClient.DownloadStream(ctx, downloadOptions)
if err != nil {
return err
}

statusCode := http.StatusOK
if resp.ContentRange != nil {
// Use 206 Partial Content for range requests
statusCode = http.StatusPartialContent
} else if resp.ContentLength != nil && *resp.ContentLength == 0 {
statusCode = http.StatusNoContent
}
w.WriteHeader(statusCode)

_, err = io.Copy(w, resp.Body)
resp.Body.Close()
return err
}

func (blockBlob *BlockBlob) GetOffset(ctx context.Context) (int64, error) {
// Get the offset of the file from azure storage
// For the blob, show each block (ID and size) that is a committed part of it.
Expand Down Expand Up @@ -253,6 +281,11 @@ func (infoBlob *InfoBlob) Download(ctx context.Context) (io.ReadCloser, error) {
return resp.Body, nil
}

// ServeContent is not needed for infoBlob
func (infoBlob *InfoBlob) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
return nil
}

// infoBlob does not utilise offset, so just return 0, nil
func (infoBlob *InfoBlob) GetOffset(ctx context.Context) (int64, error) {
return 0, nil
Expand Down Expand Up @@ -309,3 +342,43 @@ func checkForNotFoundError(err error) error {
}
return err
}

// parse the Range, If-Match, If-None-Match, If-Unmodified-Since, If-Modified-Since headers if present
func parseDownloadOptions(r *http.Request) (*azblob.DownloadStreamOptions, error) {
input := azblob.DownloadStreamOptions{AccessConditions: &azblob.AccessConditions{}}

if val := r.Header.Get("Range"); val != "" {
// zero value count indicates from the offset to the resource's end, suffix-length is not required
input.Range = azblob.HTTPRange{Offset: 0, Count: 0}
if _, err := fmt.Sscanf(val, "bytes=%d-%d", &input.Range.Offset, &input.Range.Count); err != nil {
if _, err := fmt.Sscanf(val, "bytes=%d-", &input.Range.Offset); err != nil {
return nil, err
}
}
}
if val := r.Header.Get("If-Match"); val != "" {
etagIfMatch := azcore.ETag(val)
input.AccessConditions.ModifiedAccessConditions.IfMatch = &etagIfMatch
}
if val := r.Header.Get("If-None-Match"); val != "" {
etagIfNoneMatch := azcore.ETag(val)
input.AccessConditions.ModifiedAccessConditions.IfNoneMatch = &etagIfNoneMatch
}
if val := r.Header.Get("If-Modified-Since"); val != "" {
t, err := http.ParseTime(val)
if err != nil {
return nil, err
}
input.AccessConditions.ModifiedAccessConditions.IfModifiedSince = &t

}
if val := r.Header.Get("If-Unmodified-Since"); val != "" {
t, err := http.ParseTime(val)
if err != nil {
return nil, err
}
input.AccessConditions.ModifiedAccessConditions.IfUnmodifiedSince = &t
}

return &input, nil
}
11 changes: 11 additions & 0 deletions pkg/azurestore/azurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"io/fs"
"os"
"strings"
Expand Down Expand Up @@ -47,6 +48,7 @@ func (store AzureStore) UseIn(composer *handler.StoreComposer) {
composer.UseCore(store)
composer.UseTerminater(store)
composer.UseLengthDeferrer(store)
composer.UseContentServer(store)
}

func (store AzureStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
Expand Down Expand Up @@ -149,6 +151,10 @@ func (store AzureStore) AsLengthDeclarableUpload(upload handler.Upload) handler.
return upload.(*AzUpload)
}

func (store AzureStore) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*AzUpload)
}

func (upload *AzUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
// Create a temporary file for holding the uploaded data
file, err := os.CreateTemp(upload.tempDir, "tusd-az-tmp-")
Expand Down Expand Up @@ -214,6 +220,11 @@ func (upload *AzUpload) GetReader(ctx context.Context) (io.ReadCloser, error) {
return upload.BlockBlob.Download(ctx)
}

// Serves the contents of the blob directly handling special HTTP headers like Range, if set
func (upload *AzUpload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
return upload.BlockBlob.ServeContent(ctx, w, r)
}

// Finish the file upload and commit the block list
func (upload *AzUpload) FinishUpload(ctx context.Context) error {
return upload.BlockBlob.Commit(ctx)
Expand Down

0 comments on commit 607aafc

Please sign in to comment.