From 661a0f3bb7d91af696305f005ac9b47a9fa791fb Mon Sep 17 00:00:00 2001 From: toni-moreno Date: Mon, 1 May 2017 08:40:17 +0200 Subject: [PATCH] device stats refactor, also added inheritDeviceTags option to send device tags to selfmon statistics backend, related to #192 --- .bra.toml | 4 +- conf/sample.config.toml | 2 + pkg/agent/device/measgather.go | 50 +++------- pkg/agent/device/snmpdevice.go | 65 +++++++------ pkg/agent/device/stats.go | 169 +++++++++++++++++++++++++-------- pkg/agent/selfmon/selfmon.go | 13 ++- pkg/config/mainconfig.go | 9 +- src/runtime/runtimeview.html | 8 +- 8 files changed, 203 insertions(+), 117 deletions(-) diff --git a/.bra.toml b/.bra.toml index b4577d98..95bff90b 100644 --- a/.bra.toml +++ b/.bra.toml @@ -1,6 +1,6 @@ [run] init_cmds = [ - ["go", "build", "-o", "./bin/snmpcollector", "./pkg/"], + ["go", "build", "-race","-o", "./bin/snmpcollector", "./pkg/"], ["./bin/snmpcollector"] ] watch_all = true @@ -10,6 +10,6 @@ watch_dirs = [ watch_exts = [".go", ".toml"] build_delay = 1500 cmds = [ - ["go", "build", "-o", "./bin/snmpcollector", "./pkg/"], + ["go", "build", "-race","-o", "./bin/snmpcollector", "./pkg/"], ["./bin/snmpcollector"] ] diff --git a/conf/sample.config.toml b/conf/sample.config.toml index 50138d2f..9ffcb0be 100644 --- a/conf/sample.config.toml +++ b/conf/sample.config.toml @@ -63,6 +63,8 @@ freq = 60 #prefix for measurement naming prefix = "" + #inherit device tags + inheritdevicetags = true #adds extra tags to the measurement config should be set as a csv - tag=value1,tag2=value2,...,tagN=valN extratags = [ "instance=snmpcollector01" ] diff --git a/pkg/agent/device/measgather.go b/pkg/agent/device/measgather.go index 4c06e3b1..c61df81a 100644 --- a/pkg/agent/device/measgather.go +++ b/pkg/agent/device/measgather.go @@ -7,9 +7,6 @@ import ( ) func (d *SnmpDevice) measConcurrentGatherAndSend() { - var totalGets int64 - var totalErrors int64 - startSnmpStats := time.Now() var wg sync.WaitGroup for _, m := range d.Measurements { @@ -20,20 +17,19 @@ func (d *SnmpDevice) measConcurrentGatherAndSend() { d.Debugf("-------Processing measurement : %s", m.ID) nGets, nErrors, _ := m.GetData() - totalGets += nGets - totalErrors += nErrors m.ComputeEvaluatedMetrics() m.ComputeOidConditionalMetrics() if nGets > 0 { - d.addGets(nGets) + d.stats.AddGets(nGets) } if nErrors > 0 { - d.addErrors(nErrors) + d.stats.AddErrors(nErrors) } //prepare batchpoint points := m.GetInfluxPoint(d.TagMap) + startInfluxStats := time.Now() if bpts != nil { (*bpts).AddPoints(points) //send data @@ -41,21 +37,14 @@ func (d *SnmpDevice) measConcurrentGatherAndSend() { } else { d.Warnf("Can not send data to the output DB becaouse of batchpoint creation error") } + elapsedInfluxStats := time.Since(startInfluxStats) + d.stats.AddSentDuration(elapsedInfluxStats) + }(m) } wg.Wait() elapsedSnmpStats := time.Since(startSnmpStats) - d.Infof("snmp pooling took [%s] SNMP: Gets [%d] Errors [%d]", elapsedSnmpStats, totalGets, totalErrors) - d.setGatherStats(startSnmpStats, elapsedSnmpStats) - if d.selfmon != nil { - fields := map[string]interface{}{ - "process_t": elapsedSnmpStats.Seconds(), - "getsent": totalGets, - "geterror": totalErrors, - } - d.selfmon.AddDeviceMetrics(d.cfg.ID, fields) - } - + d.stats.SetGatherDuration(startSnmpStats, elapsedSnmpStats) } func (d *SnmpDevice) measSeqGatherAndSend() { @@ -74,12 +63,6 @@ func (d *SnmpDevice) measSeqGatherAndSend() { m.ComputeEvaluatedMetrics() m.ComputeOidConditionalMetrics() - if nGets > 0 { - d.addGets(nGets) - } - if nErrors > 0 { - d.addErrors(nErrors) - } //prepare batchpoint points := m.GetInfluxPoint(d.TagMap) if bpts != nil { @@ -87,17 +70,14 @@ func (d *SnmpDevice) measSeqGatherAndSend() { } } - elapsedSnmpStats := time.Since(startSnmpStats) - d.Infof("snmp pooling took [%s] SNMP: Gets [%d] Errors [%d]", elapsedSnmpStats, totalGets, totalErrors) - d.setGatherStats(startSnmpStats, elapsedSnmpStats) - if d.selfmon != nil { - fields := map[string]interface{}{ - "process_t": elapsedSnmpStats.Seconds(), - "getsent": totalGets, - "geterror": totalErrors, - } - d.selfmon.AddDeviceMetrics(d.cfg.ID, fields) + if totalGets > 0 { + d.stats.AddGets(totalGets) + } + if totalErrors > 0 { + d.stats.AddErrors(totalErrors) } + elapsedSnmpStats := time.Since(startSnmpStats) + d.stats.SetGatherDuration(startSnmpStats, elapsedSnmpStats) /************************* * * Send data to InfluxDB process @@ -111,6 +91,6 @@ func (d *SnmpDevice) measSeqGatherAndSend() { d.Warnf("Can not send data to the output DB becaouse of batchpoint creation error") } elapsedInfluxStats := time.Since(startInfluxStats) - d.Infof("influx send took [%s]", elapsedInfluxStats) + d.stats.AddSentDuration(elapsedInfluxStats) } diff --git a/pkg/agent/device/snmpdevice.go b/pkg/agent/device/snmpdevice.go index b04bafa7..02944a47 100644 --- a/pkg/agent/device/snmpdevice.go +++ b/pkg/agent/device/snmpdevice.go @@ -50,17 +50,13 @@ type SnmpDevice struct { //snmpClient *gosnmp.GoSNMP snmpClientMap map[string]*gosnmp.GoSNMP Influx *output.InfluxDB `json:"-"` - LastError time.Time + //LastError time.Time //Runtime stats - Requests int64 - Gets int64 - Errors int64 - LastGatherTime time.Time - LastGatherDuration time.Duration - LastFltUpdateTime time.Time - LastFltUpdateDuration time.Duration - //runtime controls + stats DevStat //Runtime Internal statistic + Stats *DevStat //Public info for thread safe accessing to the data () + //runtime controls + mutex sync.Mutex ReloadLoopsPending int DeviceActive bool @@ -72,8 +68,7 @@ type SnmpDevice struct { chLogLevel chan string chExit chan bool chFltUpdate chan bool - mutex sync.Mutex - selfmon *selfmon.SelfMon + CurLogLevel string Gather func() `json:"-"` } @@ -90,18 +85,27 @@ func (d *SnmpDevice) GetLogFilePath() string { return d.cfg.LogFile } -func (d *SnmpDevice) setGatherStats(start time.Time, duration time.Duration) { +func (d *SnmpDevice) GetSelfThreadSafe() *SnmpDevice { d.mutex.Lock() defer d.mutex.Unlock() - d.LastGatherTime = start - d.LastGatherDuration = duration + d.Stats = d.GetBasicStats() + return d } -func (d *SnmpDevice) setFltUpdateStats(start time.Time, duration time.Duration) { - d.mutex.Lock() - defer d.mutex.Unlock() - d.LastFltUpdateTime = start - d.LastFltUpdateDuration = duration +// GetBasicStats get basic info for this device +func (d *SnmpDevice) GetBasicStats() *DevStat { + + sum := 0 + for _, m := range d.Measurements { + sum += len(m.OidSnmpMap) + } + stat := d.stats.ThSafeCopy() + stat.ReloadLoopsPending = d.ReloadLoopsPending + stat.DeviceActive = d.DeviceActive + stat.DeviceConnected = d.DeviceConnected + stat.NumMeasurements = len(d.Measurements) + stat.NumMetrics = sum + return stat } //ReloadLoopPending needs to be mutex excluded @@ -120,16 +124,10 @@ func (d *SnmpDevice) getReloadLoopsPending() int { func (d *SnmpDevice) decReloadLoopsPending() { d.mutex.Lock() + defer d.mutex.Unlock() if d.ReloadLoopsPending > 0 { d.ReloadLoopsPending-- } - d.mutex.Unlock() -} - -func (d *SnmpDevice) GetSelfThreadSafe() *SnmpDevice { - d.mutex.Lock() - defer d.mutex.Unlock() - return d } // GetOutSenderFromMap to get info about the sender will use @@ -358,6 +356,8 @@ func (d *SnmpDevice) Init(c *config.SnmpDeviceCfg) error { } else { d.Warnf("No map detected in device") } + // Init stats + d.stats.Init(d.cfg.ID, d.TagMap, d.log) if d.cfg.ConcurrentGather == true { d.Gather = d.measConcurrentGatherAndSend @@ -382,7 +382,7 @@ func (d *SnmpDevice) End() { // SetSelfMonitoring set the ouput device where send monitoring metrics func (d *SnmpDevice) SetSelfMonitoring(cfg *selfmon.SelfMon) { - d.selfmon = cfg + d.stats.SetSelfMonitoring(cfg) } // InitSnmpConnect does the SNMP client conection and retrieve system info @@ -417,7 +417,7 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) { startSnmp := time.Now() d.InitDevMeasurements() elapsedSnmp := time.Since(startSnmp) - d.setFltUpdateStats(startSnmp, elapsedSnmp) + d.stats.SetFltUpdateStats(startSnmp, elapsedSnmp) d.Infof("snmp INIT runtime measurements/filters took [%s] ", elapsedSnmp) } else { @@ -438,7 +438,7 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) { startSnmp := time.Now() d.InitDevMeasurements() elapsedSnmp := time.Since(startSnmp) - d.setFltUpdateStats(startSnmp, elapsedSnmp) + d.stats.SetFltUpdateStats(startSnmp, elapsedSnmp) d.Infof("snmp INIT runtime measurements/filters took [%s] ", elapsedSnmp) // Round collection to nearest interval by sleeping utils.WaitAlignForNextCicle(d.cfg.Freq, d.log) @@ -455,8 +455,9 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) { * SNMP Gather data process * ***************************/ - d.resetCounters() + d.stats.ResetCounters() d.Gather() + d.stats.Send() /******************************************* * * Reload Indexes/Filters process(if needed) @@ -481,9 +482,10 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) { m.InitBuildRuntime() } } + d.setReloadLoopsPending(d.cfg.UpdateFltFreq) elapsedIdxUpdateStats := time.Since(startIdxUpdateStats) - d.setFltUpdateStats(startIdxUpdateStats, elapsedIdxUpdateStats) + d.stats.SetFltUpdateStats(startIdxUpdateStats, elapsedIdxUpdateStats) d.Infof("Index reload took [%s]", elapsedIdxUpdateStats) } } @@ -499,6 +501,7 @@ func (d *SnmpDevice) startGatherGo(wg *sync.WaitGroup) { d.Infof("invoked EXIT from SNMP Gather process ") return case <-d.chFltUpdate: + d.setReloadLoopsPending(1) case debug := <-d.chDebug: d.StateDebug = debug diff --git a/pkg/agent/device/stats.go b/pkg/agent/device/stats.go index a8856569..b740879a 100644 --- a/pkg/agent/device/stats.go +++ b/pkg/agent/device/stats.go @@ -1,62 +1,151 @@ package device -import () +import ( + "github.com/Sirupsen/logrus" + "github.com/toni-moreno/snmpcollector/pkg/agent/selfmon" + "sync" + "time" +) // DevStat minimal info to show users type DevStat struct { - Requests int64 - Gets int64 - Errors int64 + //ID + id string + tagMap map[string]string + //Control + log *logrus.Logger + selfmon *selfmon.SelfMon + mutex sync.Mutex + //Counter Statistics + Requests int64 + Gets int64 + Errors int64 + + //time statistics + // t - Gathering all snmpdata + GatherTime time.Time + GatherDuration time.Duration + // t - Apply filters on measurements + FltUpdateTime time.Time + FltUpdateDuration time.Duration + // t - Send data over output backend + SentDuration time.Duration + + //device state ReloadLoopsPending int DeviceActive bool DeviceConnected bool - NumMeasurements int - NumMetrics int + //extra measurement statistics + NumMeasurements int + NumMetrics int +} + +func (s *DevStat) Init(id string, tm map[string]string, l *logrus.Logger) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.id = id + s.tagMap = tm + s.log = l + +} + +func (s *DevStat) reset() { + s.Gets = 0 + s.Errors = 0 + s.SentDuration = 0 +} + +func (s *DevStat) getMetricFields() map[string]interface{} { + fields := map[string]interface{}{ + "process_t": s.GatherDuration.Seconds(), + "getsent": s.Gets, + "geterror": s.Errors, + } + return fields } -// GetBasicStats get basic info for this device -func (d *SnmpDevice) GetBasicStats() *DevStat { +// SetSelfMonitoring set the ouput device where send monitoring metrics +func (s *DevStat) SetSelfMonitoring(cfg *selfmon.SelfMon) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.selfmon = cfg +} - sum := 0 - for _, m := range d.Measurements { - sum += len(m.OidSnmpMap) +// ThSafeCopy get a new object with public data copied in thread safe way +func (s *DevStat) ThSafeCopy() *DevStat { + s.mutex.Lock() + defer s.mutex.Unlock() + return &DevStat{ + Requests: s.Requests, + Gets: s.Gets, + Errors: s.Errors, + //ReloadLoopsPending: s.ReloadLoopsPending, + GatherTime: s.GatherTime, + GatherDuration: s.GatherDuration, + FltUpdateTime: s.FltUpdateTime, + FltUpdateDuration: s.FltUpdateDuration, + SentDuration: s.SentDuration, } - d.mutex.Lock() - stat := &DevStat{ - Requests: d.Requests, - Gets: d.Gets, - Errors: d.Errors, - ReloadLoopsPending: d.ReloadLoopsPending, - DeviceActive: d.DeviceActive, - DeviceConnected: d.DeviceConnected, - NumMeasurements: len(d.Measurements), - NumMetrics: sum, + +} + +// Send send data to the selfmon device +func (s *DevStat) Send() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.log.Infof("STATS: snmp pooling took [%s] SNMP: Gets [%d] Errors [%d]", s.GatherDuration, s.Gets, s.Errors) + s.log.Infof("STATS: influx send took [%s]", s.SentDuration) + if s.selfmon != nil { + s.selfmon.AddDeviceMetrics(s.id, s.getMetricFields(), s.tagMap) } - d.mutex.Unlock() - return stat } -func (d *SnmpDevice) addRequests(n int64) { - d.mutex.Lock() - defer d.mutex.Unlock() - d.Requests += n +// ResetCounters initialize metric counters +func (s *DevStat) ResetCounters() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.reset() +} + +// AddRequests add num request +func (s *DevStat) AddRequests(n int64) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Requests += n +} + +// AddGets update Gets +func (s *DevStat) AddGets(n int64) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Gets += n +} + +// AddErrors update errors +func (s *DevStat) AddErrors(n int64) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Errors += n } -func (d *SnmpDevice) resetCounters() { - d.mutex.Lock() - defer d.mutex.Unlock() - d.Gets = 0 - d.Errors = 0 +// Update Gather Duration stats +func (s *DevStat) SetGatherDuration(start time.Time, duration time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.GatherTime = start + s.GatherDuration = duration } -func (d *SnmpDevice) addGets(n int64) { - d.mutex.Lock() - defer d.mutex.Unlock() - d.Gets += n +// Update Gather Duration stats +func (s *DevStat) AddSentDuration(duration time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.SentDuration += duration } -func (d *SnmpDevice) addErrors(n int64) { - d.mutex.Lock() - defer d.mutex.Unlock() - d.Errors += n +func (s *DevStat) SetFltUpdateStats(start time.Time, duration time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.FltUpdateTime = start + s.FltUpdateDuration = duration } diff --git a/pkg/agent/selfmon/selfmon.go b/pkg/agent/selfmon/selfmon.go index 8ec5be19..9563dbb7 100644 --- a/pkg/agent/selfmon/selfmon.go +++ b/pkg/agent/selfmon/selfmon.go @@ -62,9 +62,13 @@ func (sm *SelfMon) Init() { } } } + + log.Infof("Self monitoring TAGS inheritance set to : %t", sm.cfg.InheritDeviceTags) + // Measurement Names sm.rt_meas_name = "selfmon_rt" sm.gvm_meas_name = "selfmon_gvm" + if len(sm.cfg.Prefix) > 0 { sm.rt_meas_name = fmt.Sprintf("%sselfmon_rt", sm.cfg.Prefix) sm.gvm_meas_name = fmt.Sprintf("%sselfmon_gvm", sm.cfg.Prefix) @@ -84,6 +88,7 @@ func (sm *SelfMon) Init() { "gc.gc_per_second": 0.0, "gc.gc_per_interval": 0.0, } + sm.chExit = make(chan bool) } @@ -139,7 +144,7 @@ func (sm *SelfMon) addDataPoint(pt *client.Point) { } // AddDeviceMetrics add data from devices -func (sm *SelfMon) AddDeviceMetrics(deviceid string, fields map[string]interface{}) { +func (sm *SelfMon) AddDeviceMetrics(deviceid string, fields map[string]interface{}, devtags map[string]string) { if !sm.IsInitialized() { return } @@ -150,6 +155,12 @@ func (sm *SelfMon) AddDeviceMetrics(deviceid string, fields map[string]interface for k, v := range sm.TagMap { tagMap[k] = v } + if sm.cfg.InheritDeviceTags { + for k, v := range devtags { + tagMap[k] = v + } + } + tagMap["device"] = deviceid now := time.Now() pt, _ := client.NewPoint( diff --git a/pkg/config/mainconfig.go b/pkg/config/mainconfig.go index ab468ddf..c0c1989b 100644 --- a/pkg/config/mainconfig.go +++ b/pkg/config/mainconfig.go @@ -28,10 +28,11 @@ type DatabaseCfg struct { //SelfMonConfig configuration for self monitoring type SelfMonConfig struct { - Enabled bool `toml:"enabled"` - Freq int `toml:"freq"` - Prefix string `toml:"prefix"` - ExtraTags []string `toml:"extra-tags"` + Enabled bool `toml:"enabled"` + Freq int `toml:"freq"` + Prefix string `toml:"prefix"` + InheritDeviceTags bool `toml:"inheritdevicetags"` + ExtraTags []string `toml:"extra-tags"` } //HTTPConfig has webserver config options diff --git a/src/runtime/runtimeview.html b/src/runtime/runtimeview.html index 153612c0..a6c057f8 100644 --- a/src/runtime/runtimeview.html +++ b/src/runtime/runtimeview.html @@ -54,19 +54,19 @@

Gets - + Errors - + Last Gather Duration - + Last Filter Duration - +
{{runtime_dev['Gets']}}{{runtime_dev['Stats'].Gets}}
{{runtime_dev['Errors']}}{{runtime_dev['Stats'].Errors}}
{{runtime_dev['LastGatherDuration'] / 1000000000 }}{{runtime_dev['Stats'].LastGatherDuration / 1000000000 }}
{{runtime_dev['LastFltUpdateDuration'] / 1000000000 }}{{runtime_dev['Stats'].LastFltUpdateDuration / 1000000000 }}