From 531126579cd30e390ab6efa780979d9b52fa13bc Mon Sep 17 00:00:00 2001 From: N-o-Z Date: Wed, 9 Aug 2023 13:19:07 +0300 Subject: [PATCH] Implement lakectl local clone (#6320) * Implement lakectl local clone * Fix unit test * CR Fixes * More fixes --- cmd/lakectl/cmd/local.go | 31 ++++ cmd/lakectl/cmd/local_clone.go | 102 +++++++++++++ cmd/lakectl/cmd/local_init.go | 64 ++++---- cmd/lakectl/cmd/local_list.go | 2 +- docs/reference/cli.md | 24 ++- go.mod | 3 +- go.sum | 2 + pkg/fileutil/io.go | 74 ++++++++-- pkg/fileutil/io_test.go | 141 +++++++++++++++++- pkg/local/errors.go | 8 + pkg/local/index.go | 6 +- pkg/local/index_test.go | 3 +- pkg/local/progress.go | 129 ++++++++++++++++ pkg/local/sync.go | 262 +++++++++++++++++++++++++++++++++ pkg/uri/parser.go | 9 ++ 15 files changed, 809 insertions(+), 51 deletions(-) create mode 100644 cmd/lakectl/cmd/local_clone.go create mode 100644 pkg/local/errors.go create mode 100644 pkg/local/progress.go create mode 100644 pkg/local/sync.go diff --git a/cmd/lakectl/cmd/local.go b/cmd/lakectl/cmd/local.go index 4ad28066bcd..f919c8b0183 100644 --- a/cmd/lakectl/cmd/local.go +++ b/cmd/lakectl/cmd/local.go @@ -4,6 +4,37 @@ import ( "github.com/spf13/cobra" ) +const ( + localDefaultSyncParallelism = 25 + localDefaultSyncPresign = true +) + +func withParallelismFlag(cmd *cobra.Command) { + cmd.Flags().IntP("parallelism", "p", localDefaultSyncParallelism, + "Max concurrent operations to perform") +} + +func withPresignFlag(cmd *cobra.Command) { + cmd.Flags().Bool("presign", localDefaultSyncPresign, + "Use pre-signed URLs when downloading/uploading data (recommended)") +} + +func withLocalSyncFlags(cmd *cobra.Command) { + withParallelismFlag(cmd) + withPresignFlag(cmd) +} + +type syncFlags struct { + parallelism int + presign bool +} + +func getLocalSyncFlags(cmd *cobra.Command) syncFlags { + parallelism := Must(cmd.Flags().GetInt("parallelism")) + presign := Must(cmd.Flags().GetBool("presign")) + return syncFlags{parallelism: parallelism, presign: presign} +} + var localCmd = &cobra.Command{ Use: "local", // TODO: Remove BETA when feature complete diff --git a/cmd/lakectl/cmd/local_clone.go b/cmd/lakectl/cmd/local_clone.go new file mode 100644 index 00000000000..d1cb2874175 --- /dev/null +++ b/cmd/lakectl/cmd/local_clone.go @@ -0,0 +1,102 @@ +package cmd + +import ( + "errors" + "fmt" + "io/fs" + "net/http" + "path/filepath" + "strings" + + "github.com/go-openapi/swag" + "github.com/spf13/cobra" + "github.com/treeverse/lakefs/pkg/api" + "github.com/treeverse/lakefs/pkg/fileutil" + "github.com/treeverse/lakefs/pkg/local" + "github.com/treeverse/lakefs/pkg/uri" +) + +const ( + localCloneMinArgs = 1 + localCloneMaxArgs = 2 + filesChanSize = 1000 +) + +var localCloneCmd = &cobra.Command{ + Use: "clone [directory]", + Short: "Clone a path from a lakeFS repository into a new directory.", + Args: cobra.RangeArgs(localCloneMinArgs, localCloneMaxArgs), + Run: func(cmd *cobra.Command, args []string) { + remote := MustParsePathURI("path", args[0]) + dir := "." + if len(args) == localCloneMaxArgs { + dir = args[1] + } + syncFlags := getLocalSyncFlags(cmd) + localPath, err := filepath.Abs(dir) + if err != nil { + DieErr(err) + } + + empty, err := fileutil.IsDirEmpty(localPath) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + DieErr(err) + } + } else if !empty { + DieFmt("directory '%s' exists and is not empty", localPath) + } + + idx, err := localInit(cmd.Context(), localPath, remote, false) + if err != nil { + DieErr(err) + } + stableRemote := remote.WithRef(idx.AtHead) + client := getClient() + // Dynamically construct changes + c := make(chan *local.Change, filesChanSize) + go func() { + defer close(c) + hasMore := true + var after string + for hasMore { + listResp, err := client.ListObjectsWithResponse(cmd.Context(), remote.Repository, stableRemote.Ref, &api.ListObjectsParams{ + After: (*api.PaginationAfter)(swag.String(after)), + Prefix: (*api.PaginationPrefix)(remote.Path), + UserMetadata: swag.Bool(true), + }) + DieOnErrorOrUnexpectedStatusCode(listResp, err, http.StatusOK) + + for _, o := range listResp.JSON200.Results { + path := strings.TrimPrefix(o.Path, remote.GetPath()) + // skip directory markers + if path == "" || (strings.HasSuffix(path, uri.PathSeparator) && swag.Int64Value(o.SizeBytes) == 0) { + continue + } + path = strings.TrimPrefix(path, uri.PathSeparator) + c <- &local.Change{ + Source: local.ChangeSourceRemote, + Path: path, + Type: local.ChangeTypeAdded, + } + } + hasMore = listResp.JSON200.Pagination.HasMore + after = listResp.JSON200.Pagination.NextOffset + } + }() + + s := local.NewSyncManager(cmd.Context(), client, syncFlags.parallelism, syncFlags.presign) + err = s.Sync(localPath, stableRemote, c) + + if err != nil { + DieErr(err) + } + fmt.Printf("Successfully cloned %s to %s.\nTotal objects downloaded:\t%d\n", remote, localPath, s.Summary().Downloaded) + }, +} + +//nolint:gochecknoinits +func init() { + withLocalSyncFlags(localCloneCmd) + localCmd.AddCommand(localCloneCmd) +} diff --git a/cmd/lakectl/cmd/local_init.go b/cmd/lakectl/cmd/local_init.go index ef942086ca8..043b7e94e2a 100644 --- a/cmd/lakectl/cmd/local_init.go +++ b/cmd/lakectl/cmd/local_init.go @@ -1,14 +1,17 @@ package cmd import ( + "context" "errors" "fmt" + "io/fs" "os" "path/filepath" "github.com/spf13/cobra" "github.com/treeverse/lakefs/pkg/git" "github.com/treeverse/lakefs/pkg/local" + "github.com/treeverse/lakefs/pkg/uri" ) const ( @@ -16,9 +19,38 @@ const ( localInitMaxArgs = 2 ) +func localInit(ctx context.Context, dir string, remote *uri.URI, force bool) (*local.Index, error) { + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + DieErr(err) + } + exists, err := local.IndexExists(dir) + if err != nil { + return nil, err + } + if exists && !force { + return nil, fs.ErrExist + } + + // dereference + head := resolveCommitOrDie(ctx, getClient(), remote.Repository, remote.Ref) + idx, err := local.WriteIndex(dir, remote, head) + if err != nil { + return nil, err + } + + ignoreFile, err := git.Ignore(dir, []string{dir}, []string{filepath.Join(dir, local.IndexFileName)}, local.IgnoreMarker) + if err == nil { + fmt.Println("location added to", ignoreFile) + } else if !errors.Is(err, git.ErrNotARepository) { + return nil, err + } + + return idx, nil +} + var localInitCmd = &cobra.Command{ Use: "init [directory]", - Short: "set a local directory to sync with a lakeFS path", + Short: "set a local directory to sync with a lakeFS path.", Args: cobra.RangeArgs(localInitMinArgs, localInitMaxArgs), Run: func(cmd *cobra.Command, args []string) { remote := MustParsePathURI("path", args[0]) @@ -26,36 +58,17 @@ var localInitCmd = &cobra.Command{ if len(args) == localInitMaxArgs { dir = args[1] } - flagSet := cmd.Flags() - force := Must(flagSet.GetBool("force")) - localPath, err := filepath.Abs(dir) if err != nil { DieErr(err) } + force := Must(cmd.Flags().GetBool("force")) - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - DieErr(err) - } - exists, err := local.IndexExists(localPath) - if err != nil { - DieErr(err) - } - if exists && !force { - DieFmt("directory '%s' already linked to a lakefs path, run command with --force to overwrite", localPath) - } - - // dereference - head := resolveCommitOrDie(cmd.Context(), getClient(), remote.Repository, remote.Ref) - err = local.WriteIndex(localPath, remote, head) + _, err = localInit(cmd.Context(), localPath, remote, force) if err != nil { - DieErr(err) - } - - ignoreFile, err := git.Ignore(localPath, []string{localPath, local.IndexFileName}, []string{local.IndexFileName}, local.IgnoreMarker) - if err == nil { - fmt.Println("location added to", ignoreFile) - } else if !errors.Is(err, git.ErrNotARepository) { + if errors.Is(err, fs.ErrExist) { + DieFmt("directory '%s' already linked to a lakeFS path, run command with --force to overwrite", dir) + } DieErr(err) } @@ -65,7 +78,6 @@ var localInitCmd = &cobra.Command{ //nolint:gochecknoinits func init() { - AssignAutoConfirmFlag(localInitCmd.Flags()) localInitCmd.Flags().Bool("force", false, "Overwrites if directory already linked to a lakeFS path") localCmd.AddCommand(localInitCmd) } diff --git a/cmd/lakectl/cmd/local_list.go b/cmd/lakectl/cmd/local_list.go index f63329fad6c..f9f32b84b40 100644 --- a/cmd/lakectl/cmd/local_list.go +++ b/cmd/lakectl/cmd/local_list.go @@ -19,7 +19,7 @@ const ( var localListCmd = &cobra.Command{ Use: "list [directory]", - Short: "find and list directories that are synced with lakeFS", + Short: "find and list directories that are synced with lakeFS.", Args: cobra.RangeArgs(localListMinArgs, localListMaxArgs), Run: func(cmd *cobra.Command, args []string) { dir := "." diff --git a/docs/reference/cli.md b/docs/reference/cli.md index b6bb178d66e..a0130f44639 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -2574,6 +2574,25 @@ BETA: sync local directories with lakeFS paths +### lakectl local clone + +Clone a path from a lakeFS repository into a new directory. + +``` +lakectl local clone [directory] [flags] +``` + +#### Options +{:.no_toc} + +``` + -h, --help help for clone + -p, --parallelism int Max concurrent operations to perform (default 25) + --presign Use pre-signed URLs when downloading/uploading data (recommended) (default true) +``` + + + ### lakectl local help Help about any command @@ -2599,7 +2618,7 @@ lakectl local help [command] [flags] ### lakectl local init -set a local directory to sync with a lakeFS path +set a local directory to sync with a lakeFS path. ``` lakectl local init [directory] [flags] @@ -2611,14 +2630,13 @@ lakectl local init [directory] [flags] ``` --force Overwrites if directory already linked to a lakeFS path -h, --help help for init - -y, --yes Automatically say yes to all confirmations ``` ### lakectl local list -find and list directories that are synced with lakeFS +find and list directories that are synced with lakeFS. ``` lakectl local list [directory] [flags] diff --git a/go.mod b/go.mod index e0bdef7826d..9c161c32fe8 100644 --- a/go.mod +++ b/go.mod @@ -88,6 +88,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.2 github.com/hashicorp/go-version v1.6.0 github.com/jackc/pgx/v5 v5.3.1 + github.com/karrick/godirwalk v1.17.0 github.com/puzpuzpuz/xsync v1.5.2 go.uber.org/ratelimit v0.2.0 ) @@ -142,7 +143,7 @@ require ( github.com/docker/cli v23.0.6+incompatible // indirect github.com/docker/docker v23.0.6+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect - github.com/docker/go-units v0.5.0 + github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/fatih/color v1.15.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect diff --git a/go.sum b/go.sum index 0d99b0850e8..10dbff06767 100644 --- a/go.sum +++ b/go.sum @@ -506,6 +506,8 @@ github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI= +github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= github.com/kataras/golog v0.0.9/go.mod h1:12HJgwBIZFNGL0EJnMRhmvGA0PQGx8VFwrZtM4CqbAk= github.com/kataras/golog v0.0.10/go.mod h1:yJ8YKCmyL+nWjERB90Qwn+bdyBZsaQwU3bTVFgkFIp8= github.com/kataras/iris/v12 v12.0.1/go.mod h1:udK4vLQKkdDqMGJJVd/msuMtN6hpYJhg/lSzuxjhO+U= diff --git a/pkg/fileutil/io.go b/pkg/fileutil/io.go index 8eb66ff5231..431d9a74848 100644 --- a/pkg/fileutil/io.go +++ b/pkg/fileutil/io.go @@ -6,6 +6,8 @@ import ( "io/fs" "os" "path/filepath" + + "github.com/karrick/godirwalk" ) const ( @@ -21,19 +23,19 @@ func IsDir(p string) (bool, error) { return stat.IsDir(), nil } -func FindInParents(path, filename string) (string, error) { +// FindInParents Returns the first occurrence of filename going up the dir tree +func FindInParents(dir, filename string) (string, error) { var lookup string - fullPath, err := filepath.Abs(path) + fullPath, err := filepath.Abs(dir) if err != nil { return "", err } for fullPath != string(filepath.Separator) && fullPath != filepath.VolumeName(fullPath) { info, err := os.Stat(fullPath) - if errors.Is(err, fs.ErrNotExist) { - return "", fmt.Errorf("%s: %w", fullPath, fs.ErrNotExist) - } else if err != nil { - return "", err + if err != nil { + return "", fmt.Errorf("%s: %w", fullPath, err) } + if !info.IsDir() { // find filename here lookup = filepath.Join(filepath.Dir(fullPath), filename) @@ -41,13 +43,63 @@ func FindInParents(path, filename string) (string, error) { lookup = filepath.Join(fullPath, filename) } _, err = os.Stat(lookup) - if os.IsNotExist(err) { - fullPath = filepath.Dir(fullPath) - continue - } else if err != nil { + if err == nil { + return lookup, nil + } + if !errors.Is(err, fs.ErrNotExist) { return "", err } - return lookup, nil + // error == fs.ErrNotExist + fullPath = filepath.Dir(fullPath) } return "", nil } + +func IsDirEmpty(dir string) (bool, error) { + s, err := godirwalk.NewScanner(dir) + if err != nil { + return false, err + } + // Attempt to read only the first directory entry. Note that Scan skips both "." and ".." entries. + hasAtLeastOneChild := s.Scan() + if err = s.Err(); err != nil { + return false, err + } + + if hasAtLeastOneChild { + return false, nil + } + return true, nil +} + +// PruneEmptyDirectories iterates through the directory tree, removing empty directories, and directories that only +// contain empty directories. +func PruneEmptyDirectories(dir string) ([]string, error) { + var pruned []string + + err := godirwalk.Walk(dir, &godirwalk.Options{ + Unsorted: true, + Callback: func(_ string, _ *godirwalk.Dirent) error { + // no-op while diving in; all the fun happens in PostChildrenCallback + return nil + }, + PostChildrenCallback: func(d string, _ *godirwalk.Dirent) error { + empty, err := IsDirEmpty(d) + if err != nil { + return err + } + + if d == dir || !empty { // do not remove top level directory or a directory with at least one child + return nil + } + + err = os.Remove(d) + if err == nil { + pruned = append(pruned, d) + } + return err + }, + }) + + return pruned, err +} diff --git a/pkg/fileutil/io_test.go b/pkg/fileutil/io_test.go index 437a8a3faff..980d9f44c46 100644 --- a/pkg/fileutil/io_test.go +++ b/pkg/fileutil/io_test.go @@ -1,8 +1,10 @@ package fileutil_test import ( + "io/fs" "os" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/require" @@ -13,11 +15,9 @@ func TestFindInParents(t *testing.T) { root := t.TempDir() dirTree := filepath.Join(root, "foo", "bar", "baz", "taz") require.NoError(t, os.MkdirAll(dirTree, fileutil.DefaultDirectoryMask)) - t.Run("does not exist", func(t *testing.T) { - found, err := fileutil.FindInParents(root, ".doesnotexist21348329043289") - if err != nil { - t.Fatal(err) - } + t.Run("dir does not exist", func(t *testing.T) { + found, err := fileutil.FindInParents(filepath.Join(root, "no_dir"), "file") + require.ErrorIs(t, err, fs.ErrNotExist) if found != "" { t.Errorf("expected found to be empty, got %v", found) } @@ -83,3 +83,134 @@ func TestFindInParents(t *testing.T) { }) } } + +func TestPruneEmptyDirectories(t *testing.T) { + root := t.TempDir() + + cases := []struct { + name string + paths []string + expected []string + }{ + { + name: "prune_deep", + paths: []string{ + "a/b/", + "a/b/c.txt", + "a/d/", + "a/e/a/b/", + }, + expected: []string{ + "a/d", + "a/e", + "a/e/a", + "a/e/a/b", + }, + }, + { + name: "prune_deep_keep_neighbor", + paths: []string{ + "a/b/", + "a/b/c.txt", + "a/d/", + "a/e/a/b/", + "b.txt", + "c/", + "c/b.txt", + "d/a/b/", + }, + expected: []string{ + "a/d", + "a/e", + "a/e/a", + "a/e/a/b", + "d", + "d/a", + "d/a/b", + }, + }, + { + name: "prune_keep_root", + paths: []string{}, + expected: []string{}, + }, + { + name: "prune_all", + paths: []string{ + "a/", + "a/b/", + "a/b/c/", + "a/d/", + "a/e/", + "a/e/a/", + "a/e/a/b/", + }, + expected: []string{ + "a", + "a/b", + "a/b/c", + "a/d", + "a/e", + "a/e/a", + "a/e/a/b", + }, + }, + { + name: "nothing_to_prune", + paths: []string{ + "a/b/", + "a/b/c.txt", + "d.txt", + }, + expected: []string{}, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + var files []string + + currentRoot := filepath.Join(root, tt.name) + require.NoError(t, os.Mkdir(currentRoot, fileutil.DefaultDirectoryMask)) + + // create directory tree + for _, entry := range tt.paths { + fullPath := filepath.Join(currentRoot, entry) + if strings.HasSuffix(entry, string(os.PathSeparator)) { + // create dir + require.NoError(t, os.MkdirAll(fullPath, fileutil.DefaultDirectoryMask)) + } else { + // create file + f, err := os.Create(fullPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + files = append(files, f.Name()) + } + } + + // prune + removedDirs, err := fileutil.PruneEmptyDirectories(currentRoot) + require.NoError(t, err) + + // make relative + removedDirsRel := make([]string, len(removedDirs)) + for i, d := range removedDirs { + relPath, _ := filepath.Rel(currentRoot, d) + removedDirsRel[i] = relPath + } + + // Verify root + _, err = fileutil.IsDir(currentRoot) + require.NoError(t, err) + + // Compare pruned list + require.ElementsMatch(t, tt.expected, removedDirsRel) + + // Verify files + for _, f := range files { + _, err = os.ReadFile(f) + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/local/errors.go b/pkg/local/errors.go new file mode 100644 index 00000000000..150c7a4e9b9 --- /dev/null +++ b/pkg/local/errors.go @@ -0,0 +1,8 @@ +package local + +import "errors" + +var ( + ErrConflict = errors.New("conflict") + ErrDownloadingFile = errors.New("error downloading file") +) diff --git a/pkg/local/index.go b/pkg/local/index.go index c8b82329f7d..71243001dbd 100644 --- a/pkg/local/index.go +++ b/pkg/local/index.go @@ -35,17 +35,17 @@ func (l *Index) GetCurrentURI() (*uri.URI, error) { return uri.Parse(l.PathURI) } -func WriteIndex(path string, remote *uri.URI, atHead string) error { +func WriteIndex(path string, remote *uri.URI, atHead string) (*Index, error) { idx := &Index{ PathURI: remote.String(), AtHead: atHead, } data, err := yaml.Marshal(idx) if err != nil { - return err + return nil, err } idxPath := filepath.Join(path, IndexFileName) - return os.WriteFile(idxPath, data, IndexFileMode) + return idx, os.WriteFile(idxPath, data, IndexFileMode) } func IndexExists(baseAbs string) (bool, error) { diff --git a/pkg/local/index_test.go b/pkg/local/index_test.go index 78b7543d1e9..e3528fc3ef1 100644 --- a/pkg/local/index_test.go +++ b/pkg/local/index_test.go @@ -29,7 +29,8 @@ var ( ) func writeIndex(t *testing.T, dir string) { - require.NoError(t, local.WriteIndex(dir, testUri, head)) + _, err := local.WriteIndex(dir, testUri, head) + require.NoError(t, err) } func TestWriteIndex(t *testing.T) { diff --git a/pkg/local/progress.go b/pkg/local/progress.go new file mode 100644 index 00000000000..a40c49d12fd --- /dev/null +++ b/pkg/local/progress.go @@ -0,0 +1,129 @@ +package local + +import ( + "io" + "time" + + "github.com/jedib0t/go-pretty/v6/progress" +) + +const ( + progressTrackerLength = 20 + progressTrackerWidth = 40 + progressTrackerDone = 100 + progressTrackerUpdateFrequency = 50 * time.Millisecond +) + +type ProgressUpdaterReader struct { + r io.Reader + t *progress.Tracker +} + +func (pu *ProgressUpdaterReader) Read(p []byte) (n int, err error) { + n, err = pu.r.Read(p) + pu.t.Increment(int64(n)) + if err == io.EOF { + pu.t.MarkAsDone() + } else if err != nil { + pu.t.IncrementWithError(int64(n)) + } + return +} + +type ProgressUpdater struct { + t *progress.Tracker +} + +func (p *ProgressUpdater) Reader(reader io.Reader) io.Reader { + return &ProgressUpdaterReader{ + r: reader, + t: p.t, + } +} + +func (p *ProgressUpdater) Done() { + p.t.MarkAsDone() +} + +func (p *ProgressUpdater) Error() { + p.t.MarkAsErrored() +} + +type ProgressSpinner struct { + t *progress.Tracker +} + +func (p *ProgressSpinner) Done() { + p.t.MarkAsDone() +} + +type ProgressPool struct { + pw progress.Writer + done chan bool +} + +func (p *ProgressPool) Start() { + const tickerDuration = 25 * time.Millisecond + go p.pw.Render() + go func() { + t := time.NewTicker(tickerDuration) + for { + select { + case <-p.done: + t.Stop() + return + case <-t.C: + } + } + }() +} +func (p *ProgressPool) Stop() { + p.pw.Stop() + p.done <- true + // according to examples, give enough time for the last render loop to complete. + for p.pw.IsRenderInProgress() { + const renderSleep = 5 * time.Millisecond + time.Sleep(renderSleep) + } +} + +func (p *ProgressPool) AddReader(name string, sizeBytes int64) *ProgressUpdater { + tracker := &progress.Tracker{ + Message: name, + Total: sizeBytes, + Units: progress.UnitsBytes, + } + p.pw.AppendTracker(tracker) + return &ProgressUpdater{t: tracker} +} + +func (p *ProgressPool) AddSpinner(name string) *ProgressSpinner { + tracker := &progress.Tracker{ + Message: name, + Total: progressTrackerDone, + Units: progress.Units{ + Notation: "%", + Formatter: progress.FormatNumber, + }, + } + p.pw.AppendTracker(tracker) + return &ProgressSpinner{t: tracker} +} + +func NewProgressPool() *ProgressPool { + pw := progress.NewWriter() + pw.SetAutoStop(false) // important + pw.SetTrackerLength(progressTrackerLength) + pw.SetMessageWidth(progressTrackerWidth) + pw.SetSortBy(progress.SortByValue) + pw.SetStyle(progress.StyleDefault) + pw.SetTrackerPosition(progress.PositionRight) + pw.SetUpdateFrequency(progressTrackerUpdateFrequency) + pw.Style().Colors = progress.StyleColorsExample + pw.Style().Options.PercentFormat = "%4.1f%%" + + return &ProgressPool{ + pw: pw, + done: make(chan bool), + } +} diff --git a/pkg/local/sync.go b/pkg/local/sync.go new file mode 100644 index 00000000000..d900da1896d --- /dev/null +++ b/pkg/local/sync.go @@ -0,0 +1,262 @@ +package local + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "sync/atomic" + "syscall" + "time" + + "github.com/go-openapi/swag" + "github.com/treeverse/lakefs/pkg/api" + "github.com/treeverse/lakefs/pkg/fileutil" + "github.com/treeverse/lakefs/pkg/uri" + "golang.org/x/sync/errgroup" +) + +const ( + DefaultDirectoryMask = 0o755 + ClientMtimeMetadataKey = "x-client-mtime" +) + +type ChangeSource int + +const ( + ChangeSourceRemote ChangeSource = iota + ChangeSourceLocal +) + +type ChangeType int + +const ( + ChangeTypeAdded = iota + ChangeTypeModified + ChangeTypeRemoved + ChangeTypeConflict +) + +type Change struct { + Source ChangeSource + Path string + Type ChangeType +} + +func getMtimeFromStats(stats api.ObjectStats) (int64, error) { + if stats.Metadata == nil { + return stats.Mtime, nil + } + clientMtime, hasClientMtime := stats.Metadata.Get(ClientMtimeMetadataKey) + if hasClientMtime { + // parse + return strconv.ParseInt(clientMtime, 10, 64) + } + return stats.Mtime, nil +} + +type Tasks struct { + Downloaded uint64 + Uploaded uint64 + Removed uint64 +} + +type SyncManager struct { + ctx context.Context + client *api.ClientWithResponses + httpClient *http.Client + progressBar *ProgressPool + maxParallelism int + presign bool + tasks Tasks +} + +func NewSyncManager(ctx context.Context, client *api.ClientWithResponses, maxParallelism int, presign bool) *SyncManager { + return &SyncManager{ + ctx: ctx, + client: client, + httpClient: http.DefaultClient, + progressBar: NewProgressPool(), + maxParallelism: maxParallelism, + presign: presign, + } +} + +// Sync - sync changes between remote and local directory given the Changes channel. +// For each change, will apply download, upload or delete according to the change type and change source +func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *Change) error { + s.progressBar.Start() + defer s.progressBar.Stop() + + ch := make(chan bool, s.maxParallelism) + for i := 0; i < s.maxParallelism; i++ { + ch <- true + } + + wg, ctx := errgroup.WithContext(s.ctx) + done := false + for change := range changeSet { + <-ch // block until we have a slot + c := change + select { + case <-ctx.Done(): + done = true + default: + wg.Go(func() error { + return s.apply(ctx, rootPath, remote, c) + }) + } + ch <- true // release + if done { + break + } + } + + if err := wg.Wait(); err != nil { + return err + } + _, err := fileutil.PruneEmptyDirectories(rootPath) + return err +} + +func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error { + switch change.Type { + case ChangeTypeAdded, ChangeTypeModified: + switch change.Source { + case ChangeSourceRemote: + // remote changed something, download it! + return s.download(ctx, rootPath, remote, change) + case ChangeSourceLocal: + default: + panic("not implemented") + } + case ChangeTypeRemoved: + panic("not implemented") + case ChangeTypeConflict: + return ErrConflict + default: + panic("invalid change type") + } + return nil +} + +func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error { + destination := filepath.Join(rootPath, change.Path) + destinationDirectory := filepath.Dir(destination) + if err := os.MkdirAll(destinationDirectory, DefaultDirectoryMask); err != nil { + return err + } + + statResp, err := s.client.StatObjectWithResponse(ctx, remote.Repository, remote.Ref, &api.StatObjectParams{ + Path: filepath.ToSlash(filepath.Join(remote.GetPath(), change.Path)), + Presign: swag.Bool(s.presign), + UserMetadata: swag.Bool(true), + }) + if err != nil { + return err + } + if statResp.StatusCode() != http.StatusOK { + return fmt.Errorf("%s (stat HTTP %d): %w", change.Path, statResp.StatusCode(), ErrDownloadingFile) + } + // get mtime + mtimeSecs, err := getMtimeFromStats(*statResp.JSON200) + if err != nil { + return err + } + + if strings.HasSuffix(change.Path, uri.PathSeparator) { + // Directory marker - skip + return nil + } + + lastModified := time.Unix(mtimeSecs, 0) + sizeBytes := swag.Int64Value(statResp.JSON200.SizeBytes) + f, err := os.Create(destination) + if err != nil { + // sometimes we get a file that is actually a directory marker. + // spark loves writing those. If we already have the directory we can skip it. + if errors.Is(err, syscall.EISDIR) && sizeBytes == 0 { + return nil // no further action required! + } + return fmt.Errorf("could not create file '%s': %w", destination, err) + } + defer func() { + err = f.Close() + }() + + if sizeBytes == 0 { // if size is empty just create file + spinner := s.progressBar.AddSpinner(fmt.Sprintf("download %s", change.Path)) + defer spinner.Done() + } else { // Download file + // make request + var body io.Reader + if s.presign { + resp, err := s.httpClient.Get(statResp.JSON200.PhysicalAddress) + if err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s (pre-signed GET: HTTP %d): %w", change.Path, resp.StatusCode, ErrDownloadingFile) + } + body = resp.Body + } else { + resp, err := s.client.GetObject(ctx, remote.Repository, remote.Ref, &api.GetObjectParams{ + Path: filepath.ToSlash(filepath.Join(remote.GetPath(), change.Path)), + }) + if err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s (GetObject: HTTP %d): %w", change.Path, resp.StatusCode, ErrDownloadingFile) + } + body = resp.Body + } + + b := s.progressBar.AddReader(fmt.Sprintf("download %s", change.Path), sizeBytes) + barReader := b.Reader(body) + defer func() { + if err != nil { + b.Error() + } else { + b.Done() + } + }() + _, err = io.Copy(f, barReader) + + if err != nil { + return fmt.Errorf("could not write file '%s': %w", destination, err) + } + } + + atomic.AddUint64(&s.tasks.Downloaded, 1) + // set mtime to the server returned one + err = os.Chtimes(destination, time.Now(), lastModified) // Explicit to catch in defer func + return err +} + +func (s *SyncManager) upload(rootPath string, remote *uri.URI, change *Change) error { //nolint:unused + panic("Not Implemented") +} + +func (s *SyncManager) deleteLocal(rootPath string, change *Change) error { //nolint:unused + panic("Not Implemented") +} + +func (s *SyncManager) deleteRemote(remote *uri.URI, change *Change) error { //nolint:unused + panic("Not Implemented") +} + +func (s *SyncManager) Summary() Tasks { + return s.tasks +} diff --git a/pkg/uri/parser.go b/pkg/uri/parser.go index 76fc7e7a4d6..9b489d4e603 100644 --- a/pkg/uri/parser.go +++ b/pkg/uri/parser.go @@ -57,6 +57,15 @@ func (u *URI) GetPath() string { return *u.Path } +// WithRef returns a new URI from u replacing the Reference part with the given ref +func (u *URI) WithRef(ref string) *URI { + return &URI{ + Repository: u.Repository, + Ref: ref, + Path: u.Path, + } +} + func (u *URI) String() string { var buf strings.Builder buf.WriteString(LakeFSSchema)