-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathshutdown.go
120 lines (107 loc) · 3.06 KB
/
shutdown.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package sqspoller
import (
"time"
)
// shutdownMode is the type for a mode of shutdown.
type shutdownMode int
// shutdown modes:
const (
now shutdownMode = iota
graceful
after
)
// shutdown holds information needed to perform the required shutdown.
type shutdown struct {
sig shutdownMode
timeout <-chan time.Time
}
// ShutdownGracefully gracefully shuts down the poller.
func (p *Poller) ShutdownGracefully() error {
return p.shutdownForInput(&shutdown{
sig: graceful,
timeout: nil,
})
}
// ShutdownAfter will attempt to shutdown gracefully, if graceful shutdown cannot
// be achieved within the given time frame, the Poller will exit, potentially
// leaking unhandled resources.
func (p *Poller) ShutdownAfter(t time.Duration) error {
return p.shutdownForInput(&shutdown{
sig: after,
timeout: time.After(t),
})
}
// ShutdownNow shuts down the Poller instantly, potentially leaking unhandled
// resources.
func (p *Poller) ShutdownNow() error {
return p.shutdownForInput(&shutdown{
sig: now,
timeout: nil,
})
}
// shutdownForInput executes the shutdown for the given input.
func (p *Poller) shutdownForInput(input *shutdown) error {
if err := p.checkAndSetShuttingDownStatus(); err != nil {
return err
}
p.shutdown <- input
return <-p.shutdownErrors
}
// handleShutdown handles the shutdown orchestration for the different shutdown
// modes.
func (p *Poller) handleShutdown(sd *shutdown, pollingErrors <-chan error) error {
switch sd.sig {
case now:
p.stopRequest <- struct{}{}
p.shutdownErrors <- nil
return ErrShutdownNow
case graceful:
finalErr := p.finishCurrentJob(pollingErrors)
err := <-finalErr
p.shutdownErrors <- nil
return err
case after:
finalErr := p.finishCurrentJob(pollingErrors)
select {
case err := <-finalErr:
p.shutdownErrors <- nil
return err
case <-sd.timeout:
p.shutdownErrors <- ErrShutdownGraceful
return ErrShutdownGraceful
}
default:
// This code should never be reached! Urgent fix
// required if this error is ever returned!
p.shutdownErrors <- ErrIntegrityIssue
return ErrIntegrityIssue
}
}
// finishCurrentJob sends a stop request to the poller to tell it to stop making
// more polls after it has finished handling its current job. The returned channel
// will return the final error once the poller has been confirmed to have stopped
// polling.
func (p *Poller) finishCurrentJob(pollingErrors <-chan error) <-chan error {
p.stopRequest <- struct{}{}
p.exitWait <- struct{}{}
finalErr := make(chan error)
go func() {
err := <-pollingErrors
<-p.stopConfirmed
finalErr <- err
}()
return finalErr
}
// stopRequestReceived is called at the end of a poll cycle to check whether any
// stop requests have been made. If a stop request is received, the function will
// return true, to tell the poller to break the polling loop. This should happen
// before a graceful shutdown to ensure that no more requests to the queue are made.
func (p *Poller) stopRequestReceived() bool {
select {
case <-p.stopRequest:
p.stopConfirmed <- struct{}{}
return true
default:
return false
}
}