-
Notifications
You must be signed in to change notification settings - Fork 355
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
lakectl cmds split and simplify (#6260)
- Loading branch information
Showing
116 changed files
with
4,497 additions
and
3,612 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package cmd | ||
|
||
import ( | ||
"fmt" | ||
"math/rand" | ||
"net/http" | ||
"os" | ||
"strconv" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
"github.com/treeverse/lakefs/pkg/api" | ||
"github.com/treeverse/lakefs/pkg/api/helpers" | ||
"github.com/treeverse/lakefs/pkg/testutil/stress" | ||
) | ||
|
||
var abuseCommitCmd = &cobra.Command{ | ||
Use: "commit <source ref uri>", | ||
Short: "Commits to the source ref repeatedly", | ||
Hidden: false, | ||
Args: cobra.ExactArgs(1), | ||
ValidArgsFunction: ValidArgsRepository, | ||
Run: func(cmd *cobra.Command, args []string) { | ||
u := MustParseRefURI("source ref", args[0]) | ||
amount := Must(cmd.Flags().GetInt("amount")) | ||
gapDuration := Must(cmd.Flags().GetDuration("gap")) | ||
|
||
fmt.Println("Source branch:", u) | ||
|
||
generator := stress.NewGenerator("commit", 1, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM)) | ||
|
||
// generate randomly selected keys as input | ||
rand.Seed(time.Now().Unix()) | ||
generator.Setup(func(add stress.GeneratorAddFn) { | ||
for i := 0; i < amount; i++ { | ||
add(strconv.Itoa(i + 1)) | ||
} | ||
}) | ||
|
||
// generate randomly selected keys as input | ||
client := getClient() | ||
resp, err := client.GetRepositoryWithResponse(cmd.Context(), u.Repository) | ||
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) | ||
if resp.JSON200 == nil { | ||
Die("Bad response from server", 1) | ||
} | ||
|
||
// execute the things! | ||
generator.Run(func(input chan string, output chan stress.Result) { | ||
ctx := cmd.Context() | ||
client := getClient() | ||
for work := range input { | ||
start := time.Now() | ||
resp, err := client.CommitWithResponse(ctx, u.Repository, u.Ref, &api.CommitParams{}, | ||
api.CommitJSONRequestBody(api.CommitCreation{Message: work})) | ||
if err == nil && resp.StatusCode() != http.StatusOK { | ||
err = helpers.ResponseAsError(resp) | ||
} | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-time.After(gapDuration): | ||
} | ||
} | ||
}) | ||
}, | ||
} | ||
|
||
//nolint:gochecknoinits | ||
func init() { | ||
const defaultGap = 2 * time.Second | ||
|
||
abuseCommitCmd.Flags().Int("amount", abuseDefaultParallelism, "amount of commits to do") | ||
abuseCommitCmd.Flags().Duration("gap", defaultGap, "duration to wait between commits") | ||
|
||
abuseCmd.AddCommand(abuseCommitCmd) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package cmd | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"os" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
"github.com/treeverse/lakefs/pkg/api" | ||
"github.com/treeverse/lakefs/pkg/api/helpers" | ||
"github.com/treeverse/lakefs/pkg/testutil/stress" | ||
) | ||
|
||
var abuseCreateBranchesCmd = &cobra.Command{ | ||
Use: "create-branches <source ref uri>", | ||
Short: "Create a lot of branches very quickly.", | ||
Hidden: false, | ||
Args: cobra.ExactArgs(1), | ||
ValidArgsFunction: ValidArgsRepository, | ||
Run: func(cmd *cobra.Command, args []string) { | ||
u := MustParseRefURI("source ref", args[0]) | ||
cleanOnly := Must(cmd.Flags().GetBool("clean-only")) | ||
branchPrefix := Must(cmd.Flags().GetString("branch-prefix")) | ||
amount := Must(cmd.Flags().GetInt("amount")) | ||
parallelism := Must(cmd.Flags().GetInt("parallelism")) | ||
|
||
client := getClient() | ||
|
||
fmt.Println("Source ref:", u) | ||
deleteGen := stress.NewGenerator("delete branch", parallelism) | ||
|
||
const paginationAmount = 1000 | ||
deleteGen.Setup(func(add stress.GeneratorAddFn) { | ||
currentOffset := api.PaginationAfter(branchPrefix) | ||
amount := api.PaginationAmount(paginationAmount) | ||
for { | ||
resp, err := client.ListBranchesWithResponse(cmd.Context(), u.Repository, &api.ListBranchesParams{ | ||
After: ¤tOffset, | ||
Amount: &amount, | ||
}) | ||
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) | ||
if resp.JSON200 == nil { | ||
Die("Bad response from server", 1) | ||
} | ||
|
||
for _, ref := range resp.JSON200.Results { | ||
if !strings.HasPrefix(ref.Id, branchPrefix) { | ||
return | ||
} | ||
add(ref.Id) // this branch should be deleted! | ||
} | ||
pagination := resp.JSON200.Pagination | ||
if !pagination.HasMore { | ||
return | ||
} | ||
currentOffset = api.PaginationAfter(pagination.NextOffset) | ||
} | ||
}) | ||
|
||
// wait for deletes to end | ||
deleteGen.Run(func(input chan string, output chan stress.Result) { | ||
for branch := range input { | ||
start := time.Now() | ||
_, err := client.DeleteBranchWithResponse(cmd.Context(), u.Repository, branch) | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
} | ||
}) | ||
|
||
if cleanOnly { | ||
return // done. | ||
} | ||
|
||
// start creating branches | ||
generator := stress.NewGenerator("create branch", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM)) | ||
|
||
// generate create branch requests | ||
generator.Setup(func(add stress.GeneratorAddFn) { | ||
for i := 0; i < amount; i++ { | ||
add(fmt.Sprintf("%s-%d", branchPrefix, i)) | ||
} | ||
}) | ||
|
||
generator.Run(func(input chan string, output chan stress.Result) { | ||
ctx := cmd.Context() | ||
for branch := range input { | ||
start := time.Now() | ||
resp, err := client.CreateBranchWithResponse( | ||
ctx, u.Repository, api.CreateBranchJSONRequestBody{ | ||
Name: branch, | ||
Source: u.Ref, | ||
}) | ||
if err == nil && resp.StatusCode() != http.StatusCreated { | ||
err = helpers.ResponseAsError(resp) | ||
} | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
} | ||
}) | ||
}, | ||
} | ||
|
||
//nolint:gochecknoinits | ||
func init() { | ||
abuseCmd.AddCommand(abuseCreateBranchesCmd) | ||
abuseCreateBranchesCmd.Flags().String("branch-prefix", "abuse-", "prefix to create branches under") | ||
abuseCreateBranchesCmd.Flags().Bool("clean-only", false, "only clean up past runs") | ||
abuseCreateBranchesCmd.Flags().Int("amount", abuseDefaultAmount, "amount of things to do") | ||
abuseCreateBranchesCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of things to do in parallel") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package cmd | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
"github.com/treeverse/lakefs/pkg/api" | ||
"github.com/treeverse/lakefs/pkg/api/helpers" | ||
"github.com/treeverse/lakefs/pkg/testutil/stress" | ||
) | ||
|
||
var abuseLinkSameObjectCmd = &cobra.Command{ | ||
Use: "link-same-object <source ref uri>", | ||
Short: "Link the same object in parallel.", | ||
Hidden: false, | ||
Args: cobra.ExactArgs(1), | ||
ValidArgsFunction: ValidArgsRepository, | ||
Run: func(cmd *cobra.Command, args []string) { | ||
u := MustParseRefURI("source ref", args[0]) | ||
amount := Must(cmd.Flags().GetInt("amount")) | ||
parallelism := Must(cmd.Flags().GetInt("parallelism")) | ||
key := Must(cmd.Flags().GetString("key")) | ||
|
||
fmt.Println("Source ref:", u) | ||
fmt.Println("Object key:", key) | ||
|
||
generator := stress.NewGenerator("get-and-link", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM)) | ||
|
||
// setup generator to use the key | ||
generator.Setup(func(add stress.GeneratorAddFn) { | ||
for i := 0; i < amount; i++ { | ||
add(key) | ||
} | ||
}) | ||
|
||
// execute the things! | ||
generator.Run(func(input chan string, output chan stress.Result) { | ||
ctx := cmd.Context() | ||
client := getClient() | ||
for work := range input { | ||
start := time.Now() | ||
|
||
getResponse, err := client.GetPhysicalAddressWithResponse(ctx, u.Repository, u.Ref, &api.GetPhysicalAddressParams{Path: work}) | ||
if err == nil && getResponse.JSON200 == nil { | ||
err = helpers.ResponseAsError(getResponse) | ||
} | ||
if err != nil { | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
continue | ||
} | ||
|
||
stagingLocation := getResponse.JSON200 | ||
linkResponse, err := client.LinkPhysicalAddressWithResponse(ctx, u.Repository, u.Ref, | ||
&api.LinkPhysicalAddressParams{ | ||
Path: work, | ||
}, | ||
api.LinkPhysicalAddressJSONRequestBody{ | ||
Checksum: "00695c7307b0480c7b6bdc873cf05c15", | ||
Staging: api.StagingLocation{ | ||
PhysicalAddress: stagingLocation.PhysicalAddress, | ||
Token: stagingLocation.Token, | ||
}, | ||
UserMetadata: nil, | ||
}) | ||
if err == nil && linkResponse.JSON200 == nil { | ||
err = helpers.ResponseAsError(linkResponse) | ||
} | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
} | ||
}) | ||
}, | ||
} | ||
|
||
//nolint:gochecknoinits | ||
func init() { | ||
abuseCmd.AddCommand(abuseLinkSameObjectCmd) | ||
abuseLinkSameObjectCmd.Flags().Int("amount", abuseDefaultAmount, "amount of link object to do") | ||
abuseLinkSameObjectCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of link object to do in parallel") | ||
abuseLinkSameObjectCmd.Flags().String("key", "linked-object", "key used for the test") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package cmd | ||
|
||
import ( | ||
"math/rand" | ||
"net/http" | ||
"os" | ||
"strconv" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/spf13/cobra" | ||
"github.com/treeverse/lakefs/pkg/api" | ||
"github.com/treeverse/lakefs/pkg/api/helpers" | ||
"github.com/treeverse/lakefs/pkg/testutil/stress" | ||
) | ||
|
||
var abuseListCmd = &cobra.Command{ | ||
Use: "list <source ref uri>", | ||
Short: "List from the source ref", | ||
Hidden: false, | ||
Args: cobra.ExactArgs(1), | ||
ValidArgsFunction: ValidArgsRepository, | ||
Run: func(cmd *cobra.Command, args []string) { | ||
u := MustParseRefURI("source ref", args[0]) | ||
amount := Must(cmd.Flags().GetInt("amount")) | ||
parallelism := Must(cmd.Flags().GetInt("parallelism")) | ||
prefix := Must(cmd.Flags().GetString("prefix")) | ||
|
||
generator := stress.NewGenerator("list", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM)) | ||
|
||
// generate randomly selected keys as input | ||
rand.Seed(time.Now().Unix()) | ||
generator.Setup(func(add stress.GeneratorAddFn) { | ||
for i := 0; i < amount; i++ { | ||
add(strconv.Itoa(i + 1)) | ||
} | ||
}) | ||
|
||
listPrefix := api.PaginationPrefix(prefix) | ||
// execute the things! | ||
generator.Run(func(input chan string, output chan stress.Result) { | ||
ctx := cmd.Context() | ||
client := getClient() | ||
for range input { | ||
start := time.Now() | ||
resp, err := client.ListObjectsWithResponse(ctx, u.Repository, u.Ref, &api.ListObjectsParams{ | ||
Prefix: &listPrefix, | ||
}) | ||
if err == nil && resp.StatusCode() != http.StatusOK { | ||
err = helpers.ResponseAsError(resp) | ||
} | ||
output <- stress.Result{ | ||
Error: err, | ||
Took: time.Since(start), | ||
} | ||
} | ||
}) | ||
}, | ||
} | ||
|
||
//nolint:gochecknoinits | ||
func init() { | ||
abuseCmd.AddCommand(abuseListCmd) | ||
abuseListCmd.Flags().String("prefix", "abuse/", "prefix to list under") | ||
abuseListCmd.Flags().Int("amount", abuseDefaultAmount, "amount of lists to do") | ||
abuseListCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of lists to do in parallel") | ||
} |
Oops, something went wrong.