-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmeta.go
145 lines (125 loc) · 3.27 KB
/
meta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package amboy
import (
"context"
"time"
"github.com/tychoish/fun/erc"
"github.com/tychoish/grip"
"github.com/tychoish/grip/message"
)
// ResolveErrors takes a queue object and iterates over the results
// and returns a single aggregated error for the queue's job. The
// completeness of this operation depends on the implementation of a
// the queue implementation's Results() method.
func ResolveErrors(ctx context.Context, q Queue) error {
catcher := &erc.Collector{}
ExtractErrors(ctx, catcher, q)
return catcher.Resolve()
}
// ExtractErrors adds any errors in the completed jobs of the queue to
// the specified catcher.
func ExtractErrors(ctx context.Context, catcher *erc.Collector, q Queue) {
for result := range q.Jobs(ctx) {
if !result.Status().Completed {
continue
}
if err := ctx.Err(); err != nil {
catcher.Add(err)
break
}
catcher.Add(result.Error())
}
}
// RetrieveErrors adds any errors in the completed jobs of the queue to
// the specified error channel.
func RetrieveErrors(ctx context.Context, errs chan<- error, q Queue) {
for result := range q.Jobs(ctx) {
if !result.Status().Completed {
continue
}
if err := ctx.Err(); err != nil {
return
}
if err := result.Error(); err != nil {
select {
case <-ctx.Done():
return
case errs <- err:
continue
}
}
}
}
// PopulateQueue adds jobs from a channel to a queue and returns an
// error with the aggregated results of these operations.
func PopulateQueue(ctx context.Context, q Queue, jobs <-chan Job) error {
catcher := &erc.Collector{}
for j := range jobs {
if err := ctx.Err(); err != nil {
catcher.Add(err)
break
}
catcher.Add(q.Put(ctx, j))
}
return catcher.Resolve()
}
// QueueReport holds the ids of all tasks in a queue by state.
type QueueReport struct {
Completed []string `json:"completed"`
InProgress []string `json:"in_progress"`
Pending []string `json:"pending"`
}
// Report returns a QueueReport status for the state of a queue.
func Report(ctx context.Context, q Queue, limit int) QueueReport {
var out QueueReport
if limit == 0 {
return out
}
var count int
for j := range q.Jobs(ctx) {
stat := j.Status()
switch {
case stat.Completed:
out.Completed = append(out.Completed, stat.ID)
case stat.InProgress:
out.InProgress = append(out.InProgress, stat.ID)
default:
out.Pending = append(out.Pending, stat.ID)
}
count++
if limit > 0 && count >= limit {
break
}
}
return out
}
// RunJob executes a single job directly, without a queue, with
// similar semantics as it would execute in a queue: MaxTime is
// respected, and it uses similar logging as is present in the queue,
// with errors propogated functionally.
func RunJob(ctx context.Context, job Job) error {
var cancel context.CancelFunc
ti := job.TimeInfo()
ti.Start = time.Now()
job.UpdateTimeInfo(ti)
if ti.MaxTime > 0 {
ctx, cancel = context.WithTimeout(ctx, ti.MaxTime)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()
job.Run(ctx)
ti.End = time.Now()
job.UpdateTimeInfo(ti)
msg := message.Fields{
"job": job.ID(),
"job_type": job.Type().Name,
"duration_secs": ti.Duration().Seconds(),
}
err := job.Error()
if err != nil {
grip.Error(message.WrapError(err, msg))
} else {
grip.Debug(msg)
}
return err
}