This repository has been archived by the owner on Aug 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
122 lines (111 loc) · 3.31 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"flag"
"os"
"time"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
llog "github.com/levenlabs/go-llog"
)
type replInfo struct {
IsMaster bool
Secondary bool
}
type serverStatus struct {
PID int
Repl replInfo
}
func main() {
st := flag.String("socket-timeout", "30s", "duration of the socket timeout")
ui := flag.String("update-interval", "15s", "how often to try and update the local db")
ft := flag.String("failure-threshold", "2m", "how long since the last successful update should we try to kill mongod")
ll := flag.String("log-level", "info", "the llog log level (error, warn, info, debug)")
addr := flag.String("addr", "127.0.0.1:27017", "local mongo address, must be on the same server")
flag.Parse()
socketTimeout, err := time.ParseDuration(*st)
if err != nil {
llog.Fatal("failed to parse --socket-timeout", llog.ErrKV(err))
}
updateInterval, err := time.ParseDuration(*ui)
if err != nil {
llog.Fatal("failed to parse --update-interval", llog.ErrKV(err))
}
failureThreshold, err := time.ParseDuration(*ft)
if err != nil {
llog.Fatal("failed to parse --failure-threshold", llog.ErrKV(err))
}
if err := llog.SetLevelFromString(*ll); err != nil {
llog.Fatal("failed to set --log-level", llog.ErrKV(err))
}
sess, err := connect(*addr, socketTimeout)
if err != nil {
llog.Fatal("error connecting to mongo", llog.ErrKV(err))
}
spin(sess, updateInterval, failureThreshold)
}
func connect(addr string, timeout time.Duration) (*mgo.Session, error) {
sess, err := mgo.DialWithTimeout("mongodb://"+addr+"/local?connect=direct", timeout)
if err != nil {
return nil, err
}
sess.SetSocketTimeout(timeout)
sess.SetSafe(&mgo.Safe{
J: true,
WTimeout: int(timeout.Nanoseconds() / 1e6),
})
sess.SetMode(mgo.Eventual, false)
return sess, nil
}
func upsert(sess *mgo.Session) (serverStatus, error) {
var status serverStatus
if err := sess.Run("serverStatus", &status); err != nil {
return status, err
}
if !status.Repl.IsMaster {
llog.Debug("ignoring non-primary instance")
return status, nil
}
// no need for this to be replicated so use the local database
_, err := sess.DB("local").C("watchdog").Upsert(bson.M{
"_id": "watchdog",
}, bson.M{
"lastUpdate": time.Now(),
})
if err == nil {
llog.Debug("upserted into local watchdog collection")
}
return status, err
}
func spin(sess *mgo.Session, updateInterval, failureThreshold time.Duration) {
var firstFailure time.Time
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
for range ticker.C {
status, err := upsert(sess)
if err == nil {
firstFailure = time.Time{}
continue
}
if status.PID == 0 {
firstFailure = time.Time{}
llog.Error("error running serverStatus", llog.ErrKV(err))
// without a PID we can't do anything, so ignore
continue
}
llog.Error("error updating local watchdog serverStatus", llog.ErrKV(err))
if firstFailure.IsZero() {
firstFailure = time.Now()
}
// if we're over the threshold, kill the process
if time.Now().Sub(firstFailure) >= failureThreshold {
llog.Info("killing mongod instance", llog.KV{"pid": status.PID})
proc, err := os.FindProcess(status.PID)
if err == nil {
err = proc.Kill()
}
if err != nil {
llog.Error("error finding and killing the mongod pid", llog.KV{"pid": status.PID}, llog.ErrKV(err))
}
}
}
}