Skip to content

Commit

Permalink
chore: refactor pipelines to be goroutine-safe (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
twelvelabs authored Mar 10, 2024
1 parent c4eb3fe commit 4f7a267
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 63 deletions.
15 changes: 5 additions & 10 deletions internal/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,15 @@ func (a *FilesAction) Run(ctx context.Context) error {
}

pipeline := stylist.NewPipeline(processors, excludes)
err = pipeline.Index(ctx, a.pathSpecs)
matches, err := pipeline.Match(ctx, a.pathSpecs)
if err != nil {
return err
}

for _, processor := range processors {
fmt.Printf("Processor: %s\n", processor.Name)
paths := processor.Paths()
if len(paths) == 0 {
fmt.Printf(" [no matching files]\n")
} else {
for _, path := range processor.Paths() {
fmt.Printf(" - %s\n", path)
}
for _, match := range matches {
fmt.Printf("Processor: %s\n", match.Processor.Name)
for _, path := range match.Paths {
fmt.Printf(" - %s\n", path)
}
fmt.Printf("\n")
}
Expand Down
7 changes: 6 additions & 1 deletion internal/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,16 @@ func (a *InitAction) Run(ctx context.Context) error {
presets := store.All()
excludes := config.Excludes
pipeline := stylist.NewPipeline(presets, excludes)
processors, err := pipeline.Match(ctx, []string{"."})
matches, err := pipeline.Match(ctx, []string{"."})
if err != nil {
return err
}

processors := []*stylist.Processor{}
for _, match := range matches {
processors = append(processors, match.Processor)
}

// Generate a new config file containing all matching presets.
// Commenting out everything but the `preset: foo` line so that
// users can see what the preset is doing and how to override.
Expand Down
54 changes: 23 additions & 31 deletions internal/stylist/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ type Pipeline struct {
excludes []string
}

// Index populates the paths for each processor in the pipeline.
//
// The source paths are resolved from each path spec; matched against
// any global exclude patterns; then matched against each processor's
// individual type, include, and exclude patterns.
func (p *Pipeline) Index(ctx context.Context, pathSpecs []string) error {
// Match returns all processors that match the given path specs.
func (p *Pipeline) Match(ctx context.Context, pathSpecs []string) ([]PipelineMatch, error) {
logger := AppLogger(ctx)

// Aggregate each processor's include patterns
Expand All @@ -47,16 +43,17 @@ func (p *Pipeline) Index(ctx context.Context, pathSpecs []string) error {
includes,
p.excludes,
)

// Create an index of paths (resolved from the path specs),
// matching any of the include patterns used by our processors.
// Doing this once is _much_ faster than once per-processor,
// especially when dealing w/ very large projects and many processors or patterns.
indexer := NewPathIndexer(includes, p.excludes)
if err := indexer.Index(ctx, pathSpecs...); err != nil {
return err
return nil, err
}
logger.Debugf("Indexed in %s", time.Since(startedAt))

matches := []PipelineMatch{}
// For each processor...
for _, processor := range p.processors {
// Gather all paths matching the include patterns
Expand All @@ -76,7 +73,7 @@ func (p *Pipeline) Index(ctx context.Context, pathSpecs []string) error {
for _, pattern := range processor.Excludes {
ok, err := doublestar.PathMatch(pattern, path)
if err != nil {
return err
return nil, err
}
if ok {
excluded = true
Expand All @@ -87,25 +84,15 @@ func (p *Pipeline) Index(ctx context.Context, pathSpecs []string) error {
}
}

sort.Strings(paths)
processor.paths = paths
}

logger.Debugf("Indexed in %s", time.Since(startedAt))
return nil
}

// Match returns all processors that match the given path specs.
func (p *Pipeline) Match(ctx context.Context, pathSpecs []string) ([]*Processor, error) {
if err := p.Index(ctx, pathSpecs); err != nil {
return nil, err
}
matches := []*Processor{}
for _, processor := range p.processors {
if len(processor.Paths()) > 0 {
matches = append(matches, processor)
if len(paths) > 0 {
sort.Strings(paths)
matches = append(matches, PipelineMatch{
Paths: paths,
Processor: processor,
})
}
}

return matches, nil
}

Expand All @@ -122,8 +109,8 @@ func (p *Pipeline) Fix(ctx context.Context, pathSpecs []string) ([]*Result, erro
func (p *Pipeline) execute(
ctx context.Context, pathSpecs []string, ct CommandType,
) ([]*Result, error) {
// Get all the processors that match the pathSpecs.
processors, err := p.Match(ctx, pathSpecs)
// Match the pathSpecs.
matches, err := p.Match(ctx, pathSpecs)
if err != nil {
return nil, err
}
Expand All @@ -142,10 +129,10 @@ func (p *Pipeline) execute(

// Execute the processors in goroutines and aggregate their results.
results := []*Result{}
for _, processor := range processors {
processor := processor
for _, match := range matches {
match := match
group.Go(func() error {
pr, err := processor.Execute(ctx, ct)
pr, err := match.Processor.Execute(ctx, ct, match.Paths)
if err != nil {
return err
}
Expand Down Expand Up @@ -177,6 +164,11 @@ func (p *Pipeline) execute(
return results, nil
}

type PipelineMatch struct {
Paths []string
Processor *Processor
}

type ResultsTransformer func(ctx context.Context, results []*Result) ([]*Result, error)

func FilterResults(ctx context.Context, results []*Result) ([]*Result, error) {
Expand Down
17 changes: 8 additions & 9 deletions internal/stylist/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ func TestPipeline_Check(t *testing.T) {
}
}

func TestPipeline_Index(t *testing.T) {
func TestPipeline_Match(t *testing.T) {
tests := []struct {
desc string
processors []*Processor
excludes []string
pathSpecs []string
expectFunc func(t *testing.T, pipeline *Pipeline)
expectFunc func(t *testing.T, matches []PipelineMatch)
err string
}{
{
Expand Down Expand Up @@ -178,10 +178,10 @@ func TestPipeline_Index(t *testing.T) {
pathSpecs: []string{
"testdata/txt",
},
expectFunc: func(t *testing.T, pipeline *Pipeline) {
expectFunc: func(t *testing.T, matches []PipelineMatch) {
t.Helper()

p1 := pipeline.processors[0]
assert.Len(t, matches, 2)
assert.ElementsMatch(t, []string{
"testdata/txt/001/011/111/aaa.txt",
"testdata/txt/001/011/aaa.txt",
Expand All @@ -190,9 +190,8 @@ func TestPipeline_Index(t *testing.T) {
"testdata/txt/003/033/aaa.txt",
"testdata/txt/003/aaa.txt",
"testdata/txt/aaa.txt",
}, p1.Paths())
}, matches[0].Paths)

p2 := pipeline.processors[1]
assert.ElementsMatch(t, []string{
"testdata/txt/001/011/111/aaa.txt",
"testdata/txt/001/011/aaa.txt",
Expand All @@ -201,7 +200,7 @@ func TestPipeline_Index(t *testing.T) {
"testdata/txt/002/022/aaa.txt",
"testdata/txt/002/aaa.txt",
"testdata/txt/aaa.txt",
}, p2.Paths())
}, matches[1].Paths)
},
},
}
Expand All @@ -212,7 +211,7 @@ func TestPipeline_Index(t *testing.T) {
ctx := app.InitContext(context.Background())

pipeline := NewPipeline(tt.processors, tt.excludes)
err := pipeline.Index(ctx, tt.pathSpecs)
matches, err := pipeline.Match(ctx, tt.pathSpecs)

if tt.err == "" {
assert.NoError(t, err)
Expand All @@ -221,7 +220,7 @@ func TestPipeline_Index(t *testing.T) {
}

if tt.expectFunc != nil {
tt.expectFunc(t, pipeline)
tt.expectFunc(t, matches)
}
})
}
Expand Down
17 changes: 5 additions & 12 deletions internal/stylist/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@ type Processor struct {
Excludes []string `yaml:"excludes,omitempty"`
CheckCommand *Command `yaml:"check,omitempty"`
FixCommand *Command `yaml:"fix,omitempty"`

paths []string
}

// Execute runs the given command for the current set of paths.
func (p *Processor) Execute(ctx context.Context, ct CommandType) ([]*Result, error) {
// Execute runs the given command for paths.
func (p *Processor) Execute(
ctx context.Context, ct CommandType, paths []string,
) ([]*Result, error) {
// Resolve the command to execute.
var cmd *Command
switch ct {
case CommandTypeCheck:
cmd = p.CheckCommand
case CommandTypeFix:
cmd = p.FixCommand
default:
panic(fmt.Sprintf("unknown command type '%s'", ct.String()))
}

if cmd == nil {
Expand All @@ -40,7 +38,7 @@ func (p *Processor) Execute(ctx context.Context, ct CommandType) ([]*Result, err
}

// Delegate to the command.
return cmd.Execute(ctx, p.Name, p.Paths())
return cmd.Execute(ctx, p.Name, paths)
}

// Merge merges the receiver and arguments and returns a new processor
Expand All @@ -54,11 +52,6 @@ func (p *Processor) Merge(others ...*Processor) *Processor {
return dst
}

// Paths returns all paths matched by the processor.
func (p *Processor) Paths() []string {
return p.paths
}

// ProcessorFilter filters processors by name and/or tag.
type ProcessorFilter struct {
Names []string
Expand Down

0 comments on commit 4f7a267

Please sign in to comment.