Skip to content

Commit

Permalink
[RD] Add Salad's resumable download patches (#3)
Browse files Browse the repository at this point in the history
* [RD] Annotations key declarations (internal/spec/artifact.go)

Constants for the Annotations map used by resumable downloads

Signed-off-by: Dean Troyer <[email protected]>

* [RD] Copy hashVerifier from go-digest

hashVerifier is non-public in go-digest and we need to supply a pre-initialized Hash

Signed-off-by: Dean Troyer <[email protected]>

* [RD] Add resume to VerifyReader and add hashVerifier (content/reader.go)

* Add resume field to VerifyReader to track resume mode
* Skip setting UnexpectedEOF error in Read() in resume mode
* Add hashVerifier from digest
* Recover serialized Hash object in NewVerifyReader and create a
  content.hashVerifier if in resume mode
* Duplicate tests for resume mode

Signed-off-by: Dean Troyer <[email protected]>

* [RD] Add Store.IngestFile() and Storage.IngestFile() (content/oci/storage.go)

* injest(): calculate hash on existing ingest file contents

Signed-off-by: Dean Troyer <[email protected]>

* [RD] Add Repository.FetchHead() and friends (registry/remote/repository.go)

* BlobStoreHead.FetchHead()
* Repository.FetchHead()
* blobStore.Fetch() checks for Range header support and sets it if so

Signed-off-by: Dean Troyer <[email protected]>

* [RD] Updates to CopyBuffer (internal/ioutil/io.go)

* Handle partial copies in CopyBuffer()
* Add tests (upstream TestUnwrapNopCloser(0 is failing, not fixed here)

Signed-off-by: Dean Troyer <[email protected]>

* [RD] Set up resume in doCopyNode() (copy.go)

* Look for a partial downloaded file
* Get headers from src to verify Range: support
* Set up resume arguments in desc.Annotations

Signed-off-by: Dean Troyer <[email protected]>

---------

Signed-off-by: Dean Troyer <[email protected]>
  • Loading branch information
dtroyer-salad authored May 1, 2024
1 parent f11378b commit 239949c
Show file tree
Hide file tree
Showing 14 changed files with 1,127 additions and 28 deletions.
120 changes: 120 additions & 0 deletions README.Salad.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Salad Extension to ORAS Go Library

## TL;DR

The cost of failed downloads being restarted by deleting partially downloaded layers
is a high one to pay in the Salad network especially when some of these layers may exceed
10GB. Resuming partial downloads is an important part of a robust and resilient and
performant distributed compute node with limited bandwidth.

This repository is a fork of https://github.com/oras-project/oras-go/ just after the v2.5.0
tag. The [`oras-main`](https://github.com/saladtechnologies/oras-go/tree/oras-main) contains
the upstream [`main`](https://github.com/oras-project/oras-go/tree/main) branch while
[`main`](https://github.com/saladtechnologies/oras-go/tree/main) contains
Salad's download changes. The only changes required to build the ORAS CLI (`oras`)
(https://github.com/oras-project/oras) are to use this replacement
for `oras-go`.

Typically the only change required is a `replace` line in `go.mod`, setting the
psuedo-version to the desired commit:

```
replace oras.land/oras-go/v2 v2.5.0 => github.com/saladtechnologies/oras-go/v2 v2.0.0-20240409062726-11d464f8432e
```

## Summary

### Resumable Downloads

This resumable download implementation is contained entirely within `oras-go` and the code path
below `oras.doCopyNode()`. Attempts have been made to not alter the existing external interfaces
although some new ones have been added. Resume download is always enabled but conditions are
carefully evaluated and falls back to the original code path when not possible. This
implementation does not include any way to force resume enabled (fail if not possible) or
disabled (do not attempt even when possible).

Resumable downloads are limited to remote registry source targets and local storage destination
targets. This is the use case for prepping containerd images.

### Changes

* `Annotations` key constants (`internal/spec/artifact.go`)
* AnnotationResume* - the keys used in the Annotations[] map
* The Annotations field of the Descriptor is used to pass state around during the request handling. This avoids changing the public API via interfaces or structs.
* Salad-specific keys are defined in `internal/spec/artifact.go` using constants with names beginning with `AnnotationResume`.

* `oras.doCopyNode()` (`copy.go`)
* Look for files in the ingest directory that match the current `Descriptor` being downloaded
* if found: save full filename and file size to the `Annotations` map for the `Descriptor`
* if not found: nothing to see here, proceed as normal

* `remote.FetcherHead` (`registry/remote/repository.go`)
* interface defining `FetchHead()`

* `remote.BlobStoreHead` (`registry/remote/repository.go`)
* interface combining `registry.BlobStore` with `FetcherHead`

* `remote.Repository.FetchHead()` (new) (`registry/remote/repository.go`)
* call `FetchHead()` when `BlobStoreHead` is implemented

* `remote.blobStore` (`registry/remote/repository.go`)
* `blobStore.Fetch()`
* call `FetchHead()` to check for the `Range` header support from the server
* FALSE:
* reset resume flag and proceed as usual
* TRUE:
* Set `Range` header
* after GET request to remote repository if in resume
* `StatusPartialContent`:
* check response `ContentLength` against `target.Size - ingestSize`
* `StatusOK`:
* check response `ContentLength` against `target.Size`
* `blobStore.FetchHead()` (new)
* do HEAD call to src
* `StatusOK`:
* check response `ContentLength` against `target.Size`
* check response header `Accept-Ranges` has value `bytes`
* TRUE:
* Set resume flag

* `content.Storage.Push()` (`content/oci/storage.go`)
* call `Storage.ingest()` as usual

* `content.Storage.ingest()` (`content/oci/storage.go`)
* if resume conditions are all met
* TRUE:
* open existing ingest file
* seek to 0 in ingest file
* create a new Hash to contain the current hash of the ingest file
* save encoded Hash to `Annotations[hash]`
* FALSE:
* if not found: `CreateTemp()` a new ingest file as usual
* if `0 <= ingest size < content-length`
* TRUE:
* call `ioutil.CopyBuffer()` as usual

* `ioutil.CopyBuffer()` (`internal/ioutil.io.go`)
* call `content.NewVerifyReader()` as usual
* handle `io.ErrUnexpectedEOF`: check `bytes read == desc.Size - ingestSize`

* `content.NewVerifyReader()` (`content/reader.go`)
* Add `resume` field to `VerifyReader` struct
* if `Annotations[offset]` > 0
* TRUE:
* decode `Annotations[Hash]`
* create a new `content.hashVerifier` with the new `Hash` and the original `desc.Digest`
* FALSE:
* create a new `digest.hashVerifier` from `desc.Digest`

* `content.hashVerifier` (new) (`content/verifiers.go`)
* `digest.hashVerifier` is copied here from `opencontainers/go-digest/blob/master/verifiers.go`
because it is private and we need to construct a verifier with our new `Hash` and the original `Digest`.

* `content.Resumer` (new) (`content.storage.go`)
* Interface to get ingest filenames, also used to determine support for resumable downloads

* `content.Store.IngestFile()` (new) (`content/oci/storage.go`)
* Provide access to `content.Store.storage.IngestFile()`

* `content.Storage.IngestFile()` (new) (`content/oci/storage.go`)
* Locate and return the first matching ingest file, if any
5 changes: 5 additions & 0 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (s *Store) Exists(ctx context.Context, target ocispec.Descriptor) (bool, er
return s.storage.Exists(ctx, target)
}

// Get the ingest file matching name from the storage layer
func (s *Store) IngestFile(name string) string {
return s.storage.IngestFile(name)
}

// Delete deletes the content matching the descriptor from the store. Delete may
// fail on certain systems (i.e. NTFS), if there is a process (i.e. an unclosed
// Reader) using target. If s.AutoGC is set to true, Delete will recursively
Expand Down
81 changes: 70 additions & 11 deletions content/oci/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"io/fs"
"os"
"path/filepath"
"strconv"
"sync"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/ioutil"
"oras.land/oras-go/v2/internal/spec"
)

// bufPool is a pool of byte buffers that can be reused for copying content
Expand Down Expand Up @@ -107,6 +110,18 @@ func (s *Storage) Push(_ context.Context, expected ocispec.Descriptor, content i
return nil
}

// IngestFile returns the ingest file matching name
func (s *Storage) IngestFile(name string) string {
ingestFiles, err := filepath.Glob(filepath.Join(s.ingestRoot, name+"_*"))
if err != nil || len(ingestFiles) == 0 {
// Error or no files found
return ""
}
// Found at least one file, return up the first one
// TODO: Look for the largest matching file?
return ingestFiles[0]
}

// Delete removes the target from the system.
func (s *Storage) Delete(ctx context.Context, target ocispec.Descriptor) error {
path, err := blobPath(target.Digest)
Expand All @@ -125,18 +140,59 @@ func (s *Storage) Delete(ctx context.Context, target ocispec.Descriptor) error {
}

// ingest write the content into a temporary ingest file.
func (s *Storage) ingest(expected ocispec.Descriptor, content io.Reader) (path string, ingestErr error) {
func (s *Storage) ingest(expected ocispec.Descriptor, contentReader io.Reader) (path string, ingestErr error) {
if err := ensureDir(s.ingestRoot); err != nil {
return "", fmt.Errorf("failed to ensure ingest dir: %w", err)
}

// create a temp file with the file name format "blobDigest_randomString"
// in the ingest directory.
// Go ensures that multiple programs or goroutines calling CreateTemp
// simultaneously will not choose the same file.
fp, err := os.CreateTemp(s.ingestRoot, expected.Digest.Encoded()+"_*")
// Resume Download
resume := expected.Annotations[spec.AnnotationResumeDownload]
ingestFile := expected.Annotations[spec.AnnotationResumeFilename]
ingestSize, err := strconv.ParseInt(expected.Annotations[spec.AnnotationResumeOffset], 10, 64)
if err != nil {
return "", fmt.Errorf("failed to create ingest file: %w", err)
ingestSize = 0
}

// See if a partial ingest file with this hash already exists
var fp *os.File
if resume == "true" && ingestFile != "" && ingestSize > 0 && ingestSize < expected.Size {
// Found a suitable ingest file
fp, err = os.OpenFile(ingestFile, os.O_RDWR|os.O_APPEND, 0o600)
if err != nil {
return "", fmt.Errorf("failed to open partial ingest file: %w", err)
}

// Rewind file to re-verify current contents on disk
fp.Seek(0, io.SeekStart)

// Make a new Hash and update for current contents on disk
newHash := expected.Digest.Algorithm().Hash()
if n, err := io.Copy(newHash, fp); err != nil || n != ingestSize {
// If error, assume we can't use what is there
ingestSize = 0
} else {
eh, err := content.EncodeHash(newHash)
if err != nil {
// oops, still can't resume...
ingestSize = 0
} else {
expected.Annotations[spec.AnnotationResumeHash] = eh
}
}
if ingestSize == 0 {
// Reset file pointer
fp.Seek(0, io.SeekStart)
}
} else {
// No partial ingest files found
// create a temp file with the file name format "blobDigest_randomString"
// in the ingest directory.
// Go ensures that multiple programs or goroutines calling CreateTemp
// simultaneously will not choose the same file.
fp, err = os.CreateTemp(s.ingestRoot, expected.Digest.Encoded()+"_*")
if err != nil {
return "", fmt.Errorf("failed to create ingest file: %w", err)
}
}

path = fp.Name()
Expand All @@ -149,10 +205,13 @@ func (s *Storage) ingest(expected ocispec.Descriptor, content io.Reader) (path s
}()
defer fp.Close()

buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
if err := ioutil.CopyBuffer(fp, content, *buf, expected); err != nil {
return "", fmt.Errorf("failed to ingest: %w", err)
// Copy downloaded bits to ingest file only if we do not already have it all
if ingestSize >= 0 && ingestSize < expected.Size {
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
if err := ioutil.CopyBuffer(fp, contentReader, *buf, expected); err != nil {
return "", fmt.Errorf("failed to ingest: %w", err)
}
}

// change to readonly
Expand Down
Loading

0 comments on commit 239949c

Please sign in to comment.