Skip to content

Commit

Permalink
Implement lakectl local clone (#6320)
Browse files Browse the repository at this point in the history
* Implement lakectl local clone

* Fix unit test

* CR Fixes

* More fixes
  • Loading branch information
N-o-Z authored Aug 9, 2023
1 parent 419990e commit 5311265
Show file tree
Hide file tree
Showing 15 changed files with 809 additions and 51 deletions.
31 changes: 31 additions & 0 deletions cmd/lakectl/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,37 @@ import (
"github.com/spf13/cobra"
)

const (
localDefaultSyncParallelism = 25
localDefaultSyncPresign = true
)

func withParallelismFlag(cmd *cobra.Command) {
cmd.Flags().IntP("parallelism", "p", localDefaultSyncParallelism,
"Max concurrent operations to perform")
}

func withPresignFlag(cmd *cobra.Command) {
cmd.Flags().Bool("presign", localDefaultSyncPresign,
"Use pre-signed URLs when downloading/uploading data (recommended)")
}

func withLocalSyncFlags(cmd *cobra.Command) {
withParallelismFlag(cmd)
withPresignFlag(cmd)
}

type syncFlags struct {
parallelism int
presign bool
}

func getLocalSyncFlags(cmd *cobra.Command) syncFlags {
parallelism := Must(cmd.Flags().GetInt("parallelism"))
presign := Must(cmd.Flags().GetBool("presign"))
return syncFlags{parallelism: parallelism, presign: presign}
}

var localCmd = &cobra.Command{
Use: "local",
// TODO: Remove BETA when feature complete
Expand Down
102 changes: 102 additions & 0 deletions cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package cmd

import (
"errors"
"fmt"
"io/fs"
"net/http"
"path/filepath"
"strings"

"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/fileutil"
"github.com/treeverse/lakefs/pkg/local"
"github.com/treeverse/lakefs/pkg/uri"
)

const (
localCloneMinArgs = 1
localCloneMaxArgs = 2
filesChanSize = 1000
)

var localCloneCmd = &cobra.Command{
Use: "clone <path uri> [directory]",
Short: "Clone a path from a lakeFS repository into a new directory.",
Args: cobra.RangeArgs(localCloneMinArgs, localCloneMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote := MustParsePathURI("path", args[0])
dir := "."
if len(args) == localCloneMaxArgs {
dir = args[1]
}
syncFlags := getLocalSyncFlags(cmd)
localPath, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}

empty, err := fileutil.IsDirEmpty(localPath)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
DieErr(err)
}
} else if !empty {
DieFmt("directory '%s' exists and is not empty", localPath)
}

idx, err := localInit(cmd.Context(), localPath, remote, false)
if err != nil {
DieErr(err)
}
stableRemote := remote.WithRef(idx.AtHead)
client := getClient()
// Dynamically construct changes
c := make(chan *local.Change, filesChanSize)
go func() {
defer close(c)
hasMore := true
var after string
for hasMore {
listResp, err := client.ListObjectsWithResponse(cmd.Context(), remote.Repository, stableRemote.Ref, &api.ListObjectsParams{
After: (*api.PaginationAfter)(swag.String(after)),
Prefix: (*api.PaginationPrefix)(remote.Path),
UserMetadata: swag.Bool(true),
})
DieOnErrorOrUnexpectedStatusCode(listResp, err, http.StatusOK)

for _, o := range listResp.JSON200.Results {
path := strings.TrimPrefix(o.Path, remote.GetPath())
// skip directory markers
if path == "" || (strings.HasSuffix(path, uri.PathSeparator) && swag.Int64Value(o.SizeBytes) == 0) {
continue
}
path = strings.TrimPrefix(path, uri.PathSeparator)
c <- &local.Change{
Source: local.ChangeSourceRemote,
Path: path,
Type: local.ChangeTypeAdded,
}
}
hasMore = listResp.JSON200.Pagination.HasMore
after = listResp.JSON200.Pagination.NextOffset
}
}()

s := local.NewSyncManager(cmd.Context(), client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(localPath, stableRemote, c)

if err != nil {
DieErr(err)
}
fmt.Printf("Successfully cloned %s to %s.\nTotal objects downloaded:\t%d\n", remote, localPath, s.Summary().Downloaded)
},
}

//nolint:gochecknoinits
func init() {
withLocalSyncFlags(localCloneCmd)
localCmd.AddCommand(localCloneCmd)
}
64 changes: 38 additions & 26 deletions cmd/lakectl/cmd/local_init.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,74 @@
package cmd

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/git"
"github.com/treeverse/lakefs/pkg/local"
"github.com/treeverse/lakefs/pkg/uri"
)

const (
localInitMinArgs = 1
localInitMaxArgs = 2
)

func localInit(ctx context.Context, dir string, remote *uri.URI, force bool) (*local.Index, error) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
DieErr(err)
}
exists, err := local.IndexExists(dir)
if err != nil {
return nil, err
}
if exists && !force {
return nil, fs.ErrExist
}

// dereference
head := resolveCommitOrDie(ctx, getClient(), remote.Repository, remote.Ref)
idx, err := local.WriteIndex(dir, remote, head)
if err != nil {
return nil, err
}

ignoreFile, err := git.Ignore(dir, []string{dir}, []string{filepath.Join(dir, local.IndexFileName)}, local.IgnoreMarker)
if err == nil {
fmt.Println("location added to", ignoreFile)
} else if !errors.Is(err, git.ErrNotARepository) {
return nil, err
}

return idx, nil
}

var localInitCmd = &cobra.Command{
Use: "init <path uri> [directory]",
Short: "set a local directory to sync with a lakeFS path",
Short: "set a local directory to sync with a lakeFS path.",
Args: cobra.RangeArgs(localInitMinArgs, localInitMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote := MustParsePathURI("path", args[0])
dir := "."
if len(args) == localInitMaxArgs {
dir = args[1]
}
flagSet := cmd.Flags()
force := Must(flagSet.GetBool("force"))

localPath, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}
force := Must(cmd.Flags().GetBool("force"))

if err := os.MkdirAll(dir, os.ModePerm); err != nil {
DieErr(err)
}
exists, err := local.IndexExists(localPath)
if err != nil {
DieErr(err)
}
if exists && !force {
DieFmt("directory '%s' already linked to a lakefs path, run command with --force to overwrite", localPath)
}

// dereference
head := resolveCommitOrDie(cmd.Context(), getClient(), remote.Repository, remote.Ref)
err = local.WriteIndex(localPath, remote, head)
_, err = localInit(cmd.Context(), localPath, remote, force)
if err != nil {
DieErr(err)
}

ignoreFile, err := git.Ignore(localPath, []string{localPath, local.IndexFileName}, []string{local.IndexFileName}, local.IgnoreMarker)
if err == nil {
fmt.Println("location added to", ignoreFile)
} else if !errors.Is(err, git.ErrNotARepository) {
if errors.Is(err, fs.ErrExist) {
DieFmt("directory '%s' already linked to a lakeFS path, run command with --force to overwrite", dir)
}
DieErr(err)
}

Expand All @@ -65,7 +78,6 @@ var localInitCmd = &cobra.Command{

//nolint:gochecknoinits
func init() {
AssignAutoConfirmFlag(localInitCmd.Flags())
localInitCmd.Flags().Bool("force", false, "Overwrites if directory already linked to a lakeFS path")
localCmd.AddCommand(localInitCmd)
}
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (

var localListCmd = &cobra.Command{
Use: "list [directory]",
Short: "find and list directories that are synced with lakeFS",
Short: "find and list directories that are synced with lakeFS.",
Args: cobra.RangeArgs(localListMinArgs, localListMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
dir := "."
Expand Down
24 changes: 21 additions & 3 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -2574,6 +2574,25 @@ BETA: sync local directories with lakeFS paths



### lakectl local clone

Clone a path from a lakeFS repository into a new directory.

```
lakectl local clone <path uri> [directory] [flags]
```

#### Options
{:.no_toc}

```
-h, --help help for clone
-p, --parallelism int Max concurrent operations to perform (default 25)
--presign Use pre-signed URLs when downloading/uploading data (recommended) (default true)
```



### lakectl local help

Help about any command
Expand All @@ -2599,7 +2618,7 @@ lakectl local help [command] [flags]

### lakectl local init

set a local directory to sync with a lakeFS path
set a local directory to sync with a lakeFS path.

```
lakectl local init <path uri> [directory] [flags]
Expand All @@ -2611,14 +2630,13 @@ lakectl local init <path uri> [directory] [flags]
```
--force Overwrites if directory already linked to a lakeFS path
-h, --help help for init
-y, --yes Automatically say yes to all confirmations
```



### lakectl local list

find and list directories that are synced with lakeFS
find and list directories that are synced with lakeFS.

```
lakectl local list [directory] [flags]
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.2
github.com/hashicorp/go-version v1.6.0
github.com/jackc/pgx/v5 v5.3.1
github.com/karrick/godirwalk v1.17.0
github.com/puzpuzpuz/xsync v1.5.2
go.uber.org/ratelimit v0.2.0
)
Expand Down Expand Up @@ -142,7 +143,7 @@ require (
github.com/docker/cli v23.0.6+incompatible // indirect
github.com/docker/docker v23.0.6+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI=
github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/kataras/golog v0.0.9/go.mod h1:12HJgwBIZFNGL0EJnMRhmvGA0PQGx8VFwrZtM4CqbAk=
github.com/kataras/golog v0.0.10/go.mod h1:yJ8YKCmyL+nWjERB90Qwn+bdyBZsaQwU3bTVFgkFIp8=
github.com/kataras/iris/v12 v12.0.1/go.mod h1:udK4vLQKkdDqMGJJVd/msuMtN6hpYJhg/lSzuxjhO+U=
Expand Down
Loading

0 comments on commit 5311265

Please sign in to comment.