-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparallel.go
78 lines (68 loc) · 1.44 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package pipeline
import (
"context"
"fmt"
)
type parallel struct {
id string
tasks []Task
}
// Parallel returns a Stage that passes a copy of each incoming Data
// to all specified tasks, waits for all the tasks to finish before
// sending data to the next stage, and only passes the original Data
// through to the following stage.
func Parallel(id string, tasks ...Task) Stage {
if len(tasks) == 0 {
return nil
}
return ¶llel{
id: id,
tasks: tasks,
}
}
// ID implements Stage.
func (p *parallel) ID() string {
return p.id
}
// Run implements Stage.
func (p *parallel) Run(ctx context.Context, sp StageParams) {
for {
if !processStageData(ctx, sp, p.executeTask) {
break
}
}
}
func (p *parallel) executeTask(ctx context.Context, data Data, sp StageParams) (Data, error) {
select {
case <-ctx.Done():
return nil, nil
default:
}
done := make(chan Data, len(p.tasks))
for i := 0; i < len(p.tasks); i++ {
go func(idx int, clone Data) {
d, err := p.tasks[idx].Process(ctx, clone, &taskParams{
pipeline: sp.Pipeline(),
registry: sp.Registry(),
})
if err != nil {
sp.Error().Append(fmt.Errorf("pipeline stage %d: %v", sp.Position(), err))
}
done <- d
}(i, data.Clone())
}
var failed bool
for i := 0; i < len(p.tasks); i++ {
if d := <-done; d == nil {
failed = true
}
}
if failed {
return nil, nil
}
select {
case <-ctx.Done():
case sp.Output() <- data:
}
return data, nil
}