-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.go
173 lines (144 loc) · 5.27 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
Package monitoring implements a configurable standalone vpp monitoring agent.
The agent is implemented in pure Go and Cgo. It connects to VPP through its
shared memory APIs.
*/
package main
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/natefinch/lumberjack"
"net/http"
_ "net/http/pprof"
"os"
"pnda/vpp/monitoring/aggregator"
"pnda/vpp/monitoring/collector"
"pnda/vpp/monitoring/collector/keepalive"
"pnda/vpp/monitoring/config"
"pnda/vpp/monitoring/govpp"
"pnda/vpp/monitoring/util"
)
const CONNECTION_NAME = "vpp-monitoring-agent"
func init() {
log.SetOutput(os.Stdout)
log.SetLevel(log.InfoLevel)
}
func main() {
args := config.ParseFlags()
log.WithFields(log.Fields{
"args": util.StringOfNoType(args),
}).Info("Starting vpp-monitoring-agent")
// Set the logging output
log.SetOutput(&lumberjack.Logger{
Filename: args.LogFile,
MaxSize: 10,
MaxBackups: 2,
MaxAge: 0,
})
if args.Debug {
log.SetLevel(log.DebugLevel)
}
vppUuid := args.VppUuid
wiringAndConfig := args.Wiring.Parse()
// Process aggregator wiring and create instances
var aggregatorMap = createAggregators(wiringAndConfig)
// Process producers wiring and create instances
createAndStartProducers(wiringAndConfig, aggregatorMap)
// Start aggregators
startAggregators(aggregatorMap, vppUuid)
// TODO close started aggregators and producers
if args.Profile {
go startProfiling(args)
}
for {
log.Info("Starting VPP monitoring agent")
connAttempt := &govpp.VppConnectionAttempt{Name: CONNECTION_NAME}
connection := connAttempt.Connect()
keepaliveFailureCh := make(chan (int))
keepaliveStopCh := make(chan (int))
keepaliveExec := keepalive.KeepaliveCollectorConfiguration{
Name: "Keepalive-executor",
Timeout: 10,
}.Create(keepaliveFailureCh)
collector.CollectScheduled(connection, keepaliveExec, 10, keepaliveStopCh)
var collectorExecutionStopChannels [](chan (int))
// Put the first stop channel (for keepalive) in
collectorExecutionStopChannels = append(collectorExecutionStopChannels, keepaliveStopCh)
var createdCollectors []collector.Collector
for _, clctrWiringAndConfig := range wiringAndConfig.Collectors {
clctr := clctrWiringAndConfig.Config.Create(aggregatorMap[clctrWiringAndConfig.Aggregator])
createdCollectors = append(createdCollectors, clctr)
stopCh := scheduleCollector(clctrWiringAndConfig, connection, clctr)
if stopCh != nil {
collectorExecutionStopChannels = append(collectorExecutionStopChannels, stopCh)
}
}
// Block until a keepalive fails
<-keepaliveFailureCh
close(keepaliveFailureCh)
log.Error("Keepalive failure detected, reconnecting VPP and reinitializing collectors")
log.Info("Stopping all collector executions")
for _, stopChannel := range collectorExecutionStopChannels {
stopChannel <- -1
}
log.Info("Closing all collectors")
for _, createdCollector := range createdCollectors {
createdCollector.Close()
}
// This reconnect loop only works in theory, because with VPP disconnect and connect still crash the program
// so an external loop is needed to restart the agent
//connection.Disconnect()
// so instead of disconnect and loop, just return -2 as an indication to the external loop
os.Exit(100)
}
}
func createAggregators(wiringAndConfig config.WiringConfiguration) map[string](aggregator.Aggregator) {
var aggregatorMap map[string](aggregator.Aggregator) = make(map[string](aggregator.Aggregator))
for name, aggrWiringAndConfig := range wiringAndConfig.Aggregators {
aggr := aggrWiringAndConfig.Config.Create()
aggregatorMap[name] = aggr
}
return aggregatorMap
}
func startAggregators(aggregatorMap map[string](aggregator.Aggregator), uuid aggregator.VppUuid) {
for _, aggr := range aggregatorMap {
aggr.Start(uuid)
}
}
func createAndStartProducers(wiringAndConfig config.WiringConfiguration, aggregatorMap map[string](aggregator.Aggregator)) {
for _, producerWiringAndConfig := range wiringAndConfig.Producers {
prod := producerWiringAndConfig.Config.Create()
prod.Start(aggregatorMap[producerWiringAndConfig.Aggregator])
}
}
// Go to http://localhost:<debug-port>/debug/pprof/ to evaluate profiling results
func startProfiling(args config.Args) {
log.Info("Exposing profiling information")
http.ListenAndServe(fmt.Sprintf(":%v", args.ProfilePort), http.DefaultServeMux)
}
func scheduleCollector(clctrWiringAndConfig config.CollectorWiring, connection *govpp.VppConnection, clctr collector.Collector) chan (int) {
switch clctrWiringAndConfig.Scheduling.SchedulingType {
case collector.NOTIFICATION_SCHEDULING:
fallthrough
case collector.ONCE_SCHEDULING:
collector.CollectOnce(connection, clctr)
return nil
case collector.REPEATED_SCHEDULING:
if clctrWiringAndConfig.Scheduling.SchedulingDelay < 1 {
log.WithFields(log.Fields{
"delay": clctrWiringAndConfig.Scheduling.SchedulingDelay,
"component": clctrWiringAndConfig.Name,
}).Panic("Invalid scheduling delay, needs to be >0")
}
stopCh := make(chan (int))
collector.CollectScheduled(connection, clctr,
clctrWiringAndConfig.Scheduling.SchedulingDelay, stopCh)
return stopCh
default:
log.WithFields(log.Fields{
"format": clctrWiringAndConfig.Scheduling.SchedulingType,
"component": clctrWiringAndConfig.Name,
}).Panic("Uncerognized scheduling type setting")
return nil
}
}