diff --git a/.gitignore b/.gitignore index ae8412e62..e556ff012 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ ping choria-*-*-* *.rpm *.tgz +debug diff --git a/agents/rpcutil/rpcutil.go b/agents/rpcutil/rpcutil.go index 03a9f65f2..acfe73c5a 100644 --- a/agents/rpcutil/rpcutil.go +++ b/agents/rpcutil/rpcutil.go @@ -71,16 +71,16 @@ type DaemonStatsReply struct { Agents []string `json:"agents"` PID int `json:"pid"` Times CPUTimes `json:"times"` - Validated int `json:"validated"` - Unvalidated int `json:"unvalidated"` - Passed int `json:"passed"` - Filtered int `json:"filtered"` StartTime int64 `json:"starttime"` Total int `json:"total"` - Replies int `json:"replies"` ConfigFile string `json:"configfile"` Version string `json:"version"` - TTLExpired int `json:"ttlexpired"` + Validated float64 `json:"validated"` + Unvalidated float64 `json:"unvalidated"` + Passed float64 `json:"passed"` + Filtered float64 `json:"filtered"` + Replies float64 `json:"replies"` + TTLExpired float64 `json:"ttlexpired"` } // New creates a new rpcutil agent @@ -117,14 +117,22 @@ func New(mgr *agents.Manager) (*mcorpc.Agent, error) { } func daemonStatsAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply, agent *mcorpc.Agent, conn choria.ConnectorInfo) { + stats := agent.ServerInfoSource.Stats() + output := &DaemonStatsReply{ - Procs: []string{fmt.Sprintf("Go %s with %d go procs on %d cores", runtime.Version(), runtime.NumGoroutine(), runtime.NumCPU())}, - Agents: agent.ServerInfoSource.KnownAgents(), - PID: os.Getpid(), - Times: CPUTimes{}, - ConfigFile: agent.ServerInfoSource.ConfigFile(), - Version: build.Version, - StartTime: agent.ServerInfoSource.StartTime().Unix(), + Procs: []string{fmt.Sprintf("Go %s with %d go procs on %d cores", runtime.Version(), runtime.NumGoroutine(), runtime.NumCPU())}, + Agents: agent.ServerInfoSource.KnownAgents(), + PID: os.Getpid(), + Times: CPUTimes{}, + ConfigFile: agent.ServerInfoSource.ConfigFile(), + Version: build.Version, + StartTime: agent.ServerInfoSource.StartTime().Unix(), + Validated: stats.Valid, + Unvalidated: stats.Invalid, + Passed: stats.Passed, + Filtered: stats.Filtered, + Replies: stats.Replies, + TTLExpired: stats.TTLExpired, } reply.Data = output } diff --git a/choria/message.go b/choria/message.go index 7b9d8d8cb..b5b7485c0 100644 --- a/choria/message.go +++ b/choria/message.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "errors" "fmt" + "time" "github.com/choria-io/go-protocol/protocol" ) @@ -13,9 +14,10 @@ type Message struct { Payload string Agent string - Request *Message - Filter *protocol.Filter - TTL int + Request *Message + Filter *protocol.Filter + TTL int + TimeStamp time.Time SenderID string CallerID string @@ -52,6 +54,7 @@ func NewMessageFromRequest(req protocol.Request, replyto string, choria *Framewo msg.RequestID = req.RequestID() msg.TTL = req.TTL() + msg.TimeStamp = req.Time() msg.Filter, _ = req.Filter() msg.SenderID = choria.Config.Identity msg.SetBase64Payload(req.Message()) @@ -96,9 +99,6 @@ func NewMessage(payload string, agent string, collective string, msgType string, } _, err = msg.Validate() - if err != nil { - return - } return } @@ -170,6 +170,13 @@ func (self *Message) Validate() (bool, error) { return true, nil } +// ValidateTTL validates the message age +func (self *Message) ValidateTTL() bool { + earliest := time.Now().Add(-1 * time.Duration(self.TTL)) + + return self.TimeStamp.Before(earliest) +} + // SetBase64Payload sets the payload for the message, use it if the payload is Base64 encoded func (self *Message) SetBase64Payload(payload string) error { str, err := base64.StdEncoding.DecodeString(payload) diff --git a/server/agents/agents.go b/server/agents/agents.go index 1171f6356..98ee756a9 100644 --- a/server/agents/agents.go +++ b/server/agents/agents.go @@ -28,6 +28,16 @@ type ServerInfoSource interface { Classes() []string Facts() json.RawMessage StartTime() time.Time + Stats() ServerStats +} + +type ServerStats struct { + Valid float64 + Invalid float64 + Passed float64 + Filtered float64 + Replies float64 + TTLExpired float64 } type AgentReply struct { diff --git a/server/agents/agents_test.go b/server/agents/agents_test.go index fb83aaf37..5d67f4ec1 100644 --- a/server/agents/agents_test.go +++ b/server/agents/agents_test.go @@ -80,6 +80,10 @@ func (si *stubsi) StartTime() time.Time { return time.Now() } +func (si *stubsi) Stats() ServerStats { + return ServerStats{} +} + func TestFileContent(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Server/Agents") diff --git a/server/infosource.go b/server/infosource.go index 2666725ac..02b4291ab 100644 --- a/server/infosource.go +++ b/server/infosource.go @@ -7,6 +7,9 @@ import ( "github.com/choria-io/go-choria/server/agents" "github.com/choria-io/go-choria/server/discovery/classes" "github.com/choria-io/go-choria/server/discovery/facts" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) // KnownAgents is a list of agents loaded into the server instance @@ -50,3 +53,28 @@ func (srv *Instance) Facts() json.RawMessage { func (srv *Instance) StartTime() time.Time { return srv.startTime } + +func (srv *Instance) Stats() agents.ServerStats { + return agents.ServerStats{ + Valid: srv.getPromCtrValue(validatedCtr), + Invalid: srv.getPromCtrValue(unvalidatedCtr), + Passed: srv.getPromCtrValue(passedCtr), + Filtered: srv.getPromCtrValue(filteredCtr), + Replies: srv.getPromCtrValue(repliesCtr), + TTLExpired: srv.getPromCtrValue(ttlExpiredCtr), + } +} + +func (srv *Instance) getPromCtrValue(ctr *prometheus.CounterVec) float64 { + pb := &dto.Metric{} + m, err := ctr.GetMetricWithLabelValues(srv.cfg.Identity) + if err != nil { + return 0 + } + + if m.Write(pb) != nil { + return 0 + } + + return pb.GetCounter().GetValue() +} diff --git a/server/processor.go b/server/processor.go index 956a99319..1b4a42163 100644 --- a/server/processor.go +++ b/server/processor.go @@ -15,38 +15,52 @@ func (srv *Instance) handleRawMessage(ctx context.Context, wg *sync.WaitGroup, r transport, err := srv.fw.NewTransportFromJSON(string(rawmsg.Data)) if err != nil { - srv.log.Errorf("Could not deceode message into transport: %s", err.Error()) + srv.log.Errorf("Could not deceode message into transport: %s", err) + unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc() return } sreq, err := srv.fw.NewSecureRequestFromTransport(transport, false) if err != nil { - srv.log.Errorf("Could not decode incoming request: %s", err.Error()) + unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc() + srv.log.Errorf("Could not decode incoming request: %s", err) return } req, err := srv.fw.NewRequestFromSecureRequest(sreq) if err != nil { - srv.log.Errorf("Could not decode secure request: %s", err.Error()) + unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc() + srv.log.Errorf("Could not decode secure request: %s", err) return } protocol.CopyFederationData(transport, req) if !srv.discovery.ShouldProcess(req, srv.agents.KnownAgents()) { + filteredCtr.WithLabelValues(srv.cfg.Identity).Inc() srv.log.Debugf("Skipping message %s that does not match local properties", req.RequestID()) return } + passedCtr.WithLabelValues(srv.cfg.Identity).Inc() + msg, err = choria.NewMessageFromRequest(req, transport.ReplyTo(), srv.fw) if err != nil { - srv.log.Errorf("Could not create Message: %s", err.Error()) + unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc() + srv.log.Errorf("Could not create Message: %s", err) + return + } + + if !msg.ValidateTTL() { + ttlExpiredCtr.WithLabelValues(srv.cfg.Identity).Inc() + srv.log.Errorf("Message %s created at %s is too old, TTL is %d", msg.String(), msg.TimeStamp, msg.TTL) return } + validatedCtr.WithLabelValues(srv.cfg.Identity).Inc() + wg.Add(1) go srv.agents.Dispatch(ctx, wg, replies, msg, req) - } func (srv *Instance) handleReply(reply *agents.AgentReply) { @@ -57,7 +71,7 @@ func (srv *Instance) handleReply(reply *agents.AgentReply) { msg, err := choria.NewMessageFromRequest(reply.Request, reply.Message.ReplyTo(), srv.fw) if err != nil { - srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err.Error()) + srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err) return } @@ -65,7 +79,7 @@ func (srv *Instance) handleReply(reply *agents.AgentReply) { err = srv.connector.Publish(msg) if err != nil { - srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err.Error()) + srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err) } } diff --git a/server/serverinfotest/stubsi.go b/server/serverinfotest/stubsi.go index 6bf2291db..5d67530f0 100644 --- a/server/serverinfotest/stubsi.go +++ b/server/serverinfotest/stubsi.go @@ -40,3 +40,7 @@ func (si *InfoSource) Facts() json.RawMessage { func (si *InfoSource) StartTime() time.Time { return time.Now() } + +func (si *InfoSource) Stats() agents.ServerStats { + return agents.ServerStats{} +} diff --git a/server/stats.go b/server/stats.go new file mode 100644 index 000000000..417cad6e4 --- /dev/null +++ b/server/stats.go @@ -0,0 +1,46 @@ +package server + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + validatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "choria_server_validated", + Help: "Number of messages that were received and validated succesfully", + }, []string{"identity"}) + + unvalidatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "choria_server_unvalidated", + Help: "Number of messages that were received but did not pass validation", + }, []string{"identity"}) + + passedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "choria_server_passed", + Help: "Number of messages where this instance matched the filter expression", + }, []string{"identity"}) + + filteredCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "choria_server_filtered", + Help: "Number of messages where this instance did not match the filter expression", + }, []string{"identity"}) + + repliesCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "choria_server_replies", + Help: "Number of reply messages that were produced", + }, []string{"identity"}) + + ttlExpiredCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "choria_server_ttlexpired", + Help: "Number of messages received that were too old and dropped", + }, []string{"identity"}) +) + +func init() { + prometheus.MustRegister(validatedCtr) + prometheus.MustRegister(unvalidatedCtr) + prometheus.MustRegister(passedCtr) + prometheus.MustRegister(filteredCtr) + prometheus.MustRegister(repliesCtr) + prometheus.MustRegister(ttlExpiredCtr) +}