From 5cc5966186bd7ada1b4d8f5b927c7288ea2a021b Mon Sep 17 00:00:00 2001 From: Anwar Hidayat <15167551+irainia@users.noreply.github.com> Date: Fri, 18 Aug 2023 11:24:37 +0700 Subject: [PATCH] feat: allow granular control on batch size under resource recipe --- cmd/execute.go | 17 +++++------------ cmd/profile.go | 4 ++-- cmd/resource.go | 2 +- core/core.go | 10 ++-------- core/core_test.go | 28 ++++------------------------ core/loader.go | 3 +++ docs/command.md | 1 - docs/recipe.md | 7 +++++++ recipe/recipe.go | 1 + recipe/validator_test.go | 5 +++++ 10 files changed, 30 insertions(+), 48 deletions(-) diff --git a/cmd/execute.go b/cmd/execute.go index d182795..d0ad9a8 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -15,33 +15,26 @@ import ( "github.com/spf13/cobra" ) -const ( - defaultBatchSize = 4 - defaultProgressType = "progressive" -) +const defaultProgressType = "progressive" -var ( - batchSize int - progressType string -) +var progressType string func getExecuteCmd() *cobra.Command { runCmd := &cobra.Command{ Use: "execute", Short: "Execute pipeline based on the specified recipe", RunE: func(cmd *cobra.Command, args []string) error { - return executePipeline(recipePath, progressType, batchSize, nil) + return executePipeline(recipePath, progressType, nil) }, } runCmd.PersistentFlags().StringVarP(&recipePath, "recipe-path", "R", defaultRecipePath, "Path of the recipe file") - runCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "B", defaultBatchSize, "Batch size for one process") runCmd.PersistentFlags().StringVarP(&progressType, "progress-type", "P", defaultProgressType, "Progress type to be used") runCmd.AddCommand(getResourceCmd()) return runCmd } -func executePipeline(recipePath, progressType string, batchSize int, enrich func(*recipe.Recipe) error) error { +func executePipeline(recipePath, progressType string, enrich func(*recipe.Recipe) error) error { rcp, err := loadRecipe(recipePath, defaultRecipeType, defaultRecipeFormat) if err != nil { return err @@ -59,7 +52,7 @@ func executePipeline(recipePath, progressType string, batchSize int, enrich func return err } evaluate := getEvaluate() - pipeline, err := core.NewPipeline(rcp, evaluate, batchSize, newProgress) + pipeline, err := core.NewPipeline(rcp, evaluate, newProgress) if err != nil { return err } diff --git a/cmd/profile.go b/cmd/profile.go index 89fdb1e..4a8d5bd 100644 --- a/cmd/profile.go +++ b/cmd/profile.go @@ -37,12 +37,12 @@ func getProfileCmd() *cobra.Command { func getResourceTable(rcp *recipe.Recipe) *tablewriter.Table { table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"Name", "Format", "Type", "Path", "Framework"}) + table.SetHeader([]string{"Name", "Format", "Type", "Path", "Batch Size", "Framework"}) table.SetAutoMergeCells(true) table.SetRowLine(true) for _, r := range rcp.Resources { for _, frameworkName := range r.FrameworkNames { - table.Append([]string{r.Name, r.Format, r.Type, r.Path, frameworkName}) + table.Append([]string{r.Name, r.Format, r.Type, r.Path, fmt.Sprintf("%d", r.BatchSize), frameworkName}) } } return table diff --git a/cmd/resource.go b/cmd/resource.go index f6bc4a5..4aad636 100644 --- a/cmd/resource.go +++ b/cmd/resource.go @@ -34,7 +34,7 @@ func getResourceCmd() *cobra.Command { Path: path, }) } - return executePipeline(recipePath, progressType, batchSize, enrich) + return executePipeline(recipePath, progressType, enrich) }, } resourceCmd.Flags().StringVarP(&name, "name", "n", "", "name of the resource recipe to be used") diff --git a/core/core.go b/core/core.go index 4f4c15c..2a45442 100644 --- a/core/core.go +++ b/core/core.go @@ -39,7 +39,6 @@ type Pipeline struct { recipe *recipe.Recipe loader *Loader evaluate model.Evaluate - batchSize int newProgress model.NewProgress nameToFrameworkRecipe map[string]*recipe.Framework @@ -49,7 +48,6 @@ type Pipeline struct { func NewPipeline( rcp *recipe.Recipe, evaluate model.Evaluate, - batchSize int, newProgress model.NewProgress, ) (*Pipeline, error) { if rcp == nil { @@ -58,9 +56,6 @@ func NewPipeline( if evaluate == nil { return nil, errors.New("evaluate function is nil") } - if batchSize < 0 { - return nil, errors.New("batch size should be at least zero") - } if newProgress == nil { return nil, errors.New("new progress function is nil") } @@ -72,7 +67,6 @@ func NewPipeline( recipe: rcp, loader: &Loader{}, evaluate: evaluate, - batchSize: batchSize, newProgress: newProgress, nameToFrameworkRecipe: nameToFrameworkRecipe, }, nil @@ -152,8 +146,8 @@ func (p *Pipeline) executeOnResource(resourceRcp *recipe.Resource, nameToValidat } progress := p.newProgress(resourceRcp.Name, len(resourcePaths)) - batch := p.batchSize - if batch == 0 || batch >= len(resourcePaths) { + batch := resourceRcp.BatchSize + if batch <= 0 || batch > len(resourcePaths) { batch = len(resourcePaths) } counter := 0 diff --git a/core/core_test.go b/core/core_test.go index 1fc6524..3c5d1c1 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -16,12 +16,11 @@ func TestNewPipeline(t *testing.T) { var evaluate model.Evaluate = func(name, snippet string) (string, error) { return "", nil } - var batchSize = 0 var newProgress model.NewProgress = func(name string, total int) model.Progress { return nil } - actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, batchSize, newProgress) + actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, newProgress) assert.Nil(t, actualPipeline) assert.NotNil(t, actualErr) @@ -30,28 +29,11 @@ func TestNewPipeline(t *testing.T) { t.Run("should return nil and error if evaluate is nil", func(t *testing.T) { var rcp *recipe.Recipe = &recipe.Recipe{} var evaluate model.Evaluate = nil - var batchSize = 0 var newProgress model.NewProgress = func(name string, total int) model.Progress { return nil } - actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, batchSize, newProgress) - - assert.Nil(t, actualPipeline) - assert.NotNil(t, actualErr) - }) - - t.Run("should return nil and error if batchSize is less than zero", func(t *testing.T) { - var rcp *recipe.Recipe = &recipe.Recipe{} - var evaluate model.Evaluate = func(name, snippet string) (string, error) { - return "", nil - } - var batchSize = -1 - var newProgress model.NewProgress = func(name string, total int) model.Progress { - return nil - } - - actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, batchSize, newProgress) + actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, newProgress) assert.Nil(t, actualPipeline) assert.NotNil(t, actualErr) @@ -62,10 +44,9 @@ func TestNewPipeline(t *testing.T) { var evaluate model.Evaluate = func(name, snippet string) (string, error) { return "", nil } - var batchSize = 0 var newProgress model.NewProgress = nil - actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, batchSize, newProgress) + actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, newProgress) assert.Nil(t, actualPipeline) assert.NotNil(t, actualErr) @@ -76,12 +57,11 @@ func TestNewPipeline(t *testing.T) { var evaluate model.Evaluate = func(name, snippet string) (string, error) { return "", nil } - var batchSize = 0 var newProgress model.NewProgress = func(name string, total int) model.Progress { return nil } - actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, batchSize, newProgress) + actualPipeline, actualErr := core.NewPipeline(rcp, evaluate, newProgress) assert.NotNil(t, actualPipeline) assert.Nil(t, actualErr) diff --git a/core/loader.go b/core/loader.go index 6635698..7beb4af 100644 --- a/core/loader.go +++ b/core/loader.go @@ -142,6 +142,9 @@ func (l *Loader) LoadSchema(rcp *recipe.Schema) (*model.Schema, error) { return nil, fmt.Errorf("[%s] schema for recipe [%s] cannot be found", jsonFormat, rcp.Name) } data, err := l.LoadData(paths[0], rcp.Type, jsonFormat) + if err != nil { + return nil, err + } return &model.Schema{ Name: rcp.Name, Data: data, diff --git a/docs/command.md b/docs/command.md index 18fb094..f05ecb2 100644 --- a/docs/command.md +++ b/docs/command.md @@ -77,7 +77,6 @@ Running the above command will execute all frameworks under `valor.yaml` recipe. Flag | Description | Format --- | --- | --- ---batch-size | specify the number of data to be executed paralelly in one batch | it should be an integer more than 0 (zero). it is optional with default value 4 (four). --progress-type | specify the progress type during execution | currently available: `progressive` (default) and `iterative` --recipe-path | customize the recipe that will be executed | it is optional. the value should be a valid recipe path diff --git a/docs/recipe.md b/docs/recipe.md index ce96ccf..2351ed7 100644 --- a/docs/recipe.md +++ b/docs/recipe.md @@ -16,6 +16,7 @@ resources: type: file path: ./example/resource format: json + batch_size: 3 framework_names: - user_account_evaluation ... @@ -65,6 +66,12 @@ Each resource is defined by a structure with the following fields: