-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathtask.go
89 lines (71 loc) · 2.13 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package metafora
import (
"encoding/json"
"sync"
"time"
)
// Task is the minimum interface for Tasks to implement.
type Task interface {
// ID is the immutable globally unique ID for this task.
ID() string
}
type basictask string
// NewTask creates the most basic Task implementation: just a string ID.
func NewTask(id string) Task { return basictask(id) }
func (t basictask) ID() string { return string(t) }
// RunningTask represents tasks running within a consumer.
type RunningTask interface {
Task() Task
// Started is the time the task was started by this consumer.
Started() time.Time
// Stopped is the first time Stop() was called on this task or zero is it has
// yet to be called. Tasks may take an indeterminate amount of time to
// shutdown after Stop() is called.
Stopped() time.Time
// Handler implementation called for this task.
Handler() Handler
}
// runtask is the per-task state Metafora tracks internally.
type runtask struct {
// task is the original Task from the coordinator
task Task
// handler on which Run and Stop are called
h Handler
// stopL serializes calls to task.h.Stop() to make handler implementations
// easier/safer as well as guard stopped
stopL sync.Mutex
// when task was started and when Stop was first called
started time.Time
stopped time.Time
}
func newTask(task Task, h Handler) *runtask {
return &runtask{task: task, h: h, started: time.Now()}
}
func (t *runtask) stop() {
t.stopL.Lock()
defer t.stopL.Unlock()
if t.stopped.IsZero() {
t.stopped = time.Now()
}
t.h.Stop()
}
func (t *runtask) Task() Task { return t.task }
func (t *runtask) Handler() Handler { return t.h }
func (t *runtask) Started() time.Time { return t.started }
func (t *runtask) Stopped() time.Time {
t.stopL.Lock()
defer t.stopL.Unlock()
return t.stopped
}
func (t *runtask) MarshalJSON() ([]byte, error) {
js := struct {
ID string `json:"id"`
Started time.Time `json:"started"`
Stopped *time.Time `json:"stopped,omitempty"`
}{ID: t.task.ID(), Started: t.started}
// Only set stopped if it's non-zero
if s := t.Stopped(); !s.IsZero() {
js.Stopped = &s
}
return json.Marshal(&js)
}