Skip to content

Commit

Permalink
Merge pull request #149 from ripienaar/146
Browse files Browse the repository at this point in the history
(#146, #148) Record server stats and check TTLs
  • Loading branch information
ripienaar authored Jan 20, 2018
2 parents 1af974f + e1cbb2f commit 2213ca8
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ ping
choria-*-*-*
*.rpm
*.tgz
debug
34 changes: 21 additions & 13 deletions agents/rpcutil/rpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ type DaemonStatsReply struct {
Agents []string `json:"agents"`
PID int `json:"pid"`
Times CPUTimes `json:"times"`
Validated int `json:"validated"`
Unvalidated int `json:"unvalidated"`
Passed int `json:"passed"`
Filtered int `json:"filtered"`
StartTime int64 `json:"starttime"`
Total int `json:"total"`
Replies int `json:"replies"`
ConfigFile string `json:"configfile"`
Version string `json:"version"`
TTLExpired int `json:"ttlexpired"`
Validated float64 `json:"validated"`
Unvalidated float64 `json:"unvalidated"`
Passed float64 `json:"passed"`
Filtered float64 `json:"filtered"`
Replies float64 `json:"replies"`
TTLExpired float64 `json:"ttlexpired"`
}

// New creates a new rpcutil agent
Expand Down Expand Up @@ -117,14 +117,22 @@ func New(mgr *agents.Manager) (*mcorpc.Agent, error) {
}

func daemonStatsAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply, agent *mcorpc.Agent, conn choria.ConnectorInfo) {
stats := agent.ServerInfoSource.Stats()

output := &DaemonStatsReply{
Procs: []string{fmt.Sprintf("Go %s with %d go procs on %d cores", runtime.Version(), runtime.NumGoroutine(), runtime.NumCPU())},
Agents: agent.ServerInfoSource.KnownAgents(),
PID: os.Getpid(),
Times: CPUTimes{},
ConfigFile: agent.ServerInfoSource.ConfigFile(),
Version: build.Version,
StartTime: agent.ServerInfoSource.StartTime().Unix(),
Procs: []string{fmt.Sprintf("Go %s with %d go procs on %d cores", runtime.Version(), runtime.NumGoroutine(), runtime.NumCPU())},
Agents: agent.ServerInfoSource.KnownAgents(),
PID: os.Getpid(),
Times: CPUTimes{},
ConfigFile: agent.ServerInfoSource.ConfigFile(),
Version: build.Version,
StartTime: agent.ServerInfoSource.StartTime().Unix(),
Validated: stats.Valid,
Unvalidated: stats.Invalid,
Passed: stats.Passed,
Filtered: stats.Filtered,
Replies: stats.Replies,
TTLExpired: stats.TTLExpired,
}
reply.Data = output
}
Expand Down
19 changes: 13 additions & 6 deletions choria/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"time"

"github.com/choria-io/go-protocol/protocol"
)
Expand All @@ -13,9 +14,10 @@ type Message struct {
Payload string
Agent string

Request *Message
Filter *protocol.Filter
TTL int
Request *Message
Filter *protocol.Filter
TTL int
TimeStamp time.Time

SenderID string
CallerID string
Expand Down Expand Up @@ -52,6 +54,7 @@ func NewMessageFromRequest(req protocol.Request, replyto string, choria *Framewo

msg.RequestID = req.RequestID()
msg.TTL = req.TTL()
msg.TimeStamp = req.Time()
msg.Filter, _ = req.Filter()
msg.SenderID = choria.Config.Identity
msg.SetBase64Payload(req.Message())
Expand Down Expand Up @@ -96,9 +99,6 @@ func NewMessage(payload string, agent string, collective string, msgType string,
}

_, err = msg.Validate()
if err != nil {
return
}

return
}
Expand Down Expand Up @@ -170,6 +170,13 @@ func (self *Message) Validate() (bool, error) {
return true, nil
}

// ValidateTTL validates the message age
func (self *Message) ValidateTTL() bool {
earliest := time.Now().Add(-1 * time.Duration(self.TTL))

return self.TimeStamp.Before(earliest)
}

// SetBase64Payload sets the payload for the message, use it if the payload is Base64 encoded
func (self *Message) SetBase64Payload(payload string) error {
str, err := base64.StdEncoding.DecodeString(payload)
Expand Down
10 changes: 10 additions & 0 deletions server/agents/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ type ServerInfoSource interface {
Classes() []string
Facts() json.RawMessage
StartTime() time.Time
Stats() ServerStats
}

type ServerStats struct {
Valid float64
Invalid float64
Passed float64
Filtered float64
Replies float64
TTLExpired float64
}

type AgentReply struct {
Expand Down
4 changes: 4 additions & 0 deletions server/agents/agents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (si *stubsi) StartTime() time.Time {
return time.Now()
}

func (si *stubsi) Stats() ServerStats {
return ServerStats{}
}

func TestFileContent(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Server/Agents")
Expand Down
28 changes: 28 additions & 0 deletions server/infosource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"github.com/choria-io/go-choria/server/agents"
"github.com/choria-io/go-choria/server/discovery/classes"
"github.com/choria-io/go-choria/server/discovery/facts"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// KnownAgents is a list of agents loaded into the server instance
Expand Down Expand Up @@ -50,3 +53,28 @@ func (srv *Instance) Facts() json.RawMessage {
func (srv *Instance) StartTime() time.Time {
return srv.startTime
}

func (srv *Instance) Stats() agents.ServerStats {
return agents.ServerStats{
Valid: srv.getPromCtrValue(validatedCtr),
Invalid: srv.getPromCtrValue(unvalidatedCtr),
Passed: srv.getPromCtrValue(passedCtr),
Filtered: srv.getPromCtrValue(filteredCtr),
Replies: srv.getPromCtrValue(repliesCtr),
TTLExpired: srv.getPromCtrValue(ttlExpiredCtr),
}
}

func (srv *Instance) getPromCtrValue(ctr *prometheus.CounterVec) float64 {
pb := &dto.Metric{}
m, err := ctr.GetMetricWithLabelValues(srv.cfg.Identity)
if err != nil {
return 0
}

if m.Write(pb) != nil {
return 0
}

return pb.GetCounter().GetValue()
}
28 changes: 21 additions & 7 deletions server/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,52 @@ func (srv *Instance) handleRawMessage(ctx context.Context, wg *sync.WaitGroup, r

transport, err := srv.fw.NewTransportFromJSON(string(rawmsg.Data))
if err != nil {
srv.log.Errorf("Could not deceode message into transport: %s", err.Error())
srv.log.Errorf("Could not deceode message into transport: %s", err)
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
return
}

sreq, err := srv.fw.NewSecureRequestFromTransport(transport, false)
if err != nil {
srv.log.Errorf("Could not decode incoming request: %s", err.Error())
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not decode incoming request: %s", err)
return
}

req, err := srv.fw.NewRequestFromSecureRequest(sreq)
if err != nil {
srv.log.Errorf("Could not decode secure request: %s", err.Error())
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not decode secure request: %s", err)
return
}

protocol.CopyFederationData(transport, req)

if !srv.discovery.ShouldProcess(req, srv.agents.KnownAgents()) {
filteredCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Debugf("Skipping message %s that does not match local properties", req.RequestID())
return
}

passedCtr.WithLabelValues(srv.cfg.Identity).Inc()

msg, err = choria.NewMessageFromRequest(req, transport.ReplyTo(), srv.fw)
if err != nil {
srv.log.Errorf("Could not create Message: %s", err.Error())
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not create Message: %s", err)
return
}

if !msg.ValidateTTL() {
ttlExpiredCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Message %s created at %s is too old, TTL is %d", msg.String(), msg.TimeStamp, msg.TTL)
return
}

validatedCtr.WithLabelValues(srv.cfg.Identity).Inc()

wg.Add(1)
go srv.agents.Dispatch(ctx, wg, replies, msg, req)

}

func (srv *Instance) handleReply(reply *agents.AgentReply) {
Expand All @@ -57,15 +71,15 @@ func (srv *Instance) handleReply(reply *agents.AgentReply) {

msg, err := choria.NewMessageFromRequest(reply.Request, reply.Message.ReplyTo(), srv.fw)
if err != nil {
srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err.Error())
srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err)
return
}

msg.Payload = string(reply.Body)

err = srv.connector.Publish(msg)
if err != nil {
srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err.Error())
srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err)
}

}
Expand Down
4 changes: 4 additions & 0 deletions server/serverinfotest/stubsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ func (si *InfoSource) Facts() json.RawMessage {
func (si *InfoSource) StartTime() time.Time {
return time.Now()
}

func (si *InfoSource) Stats() agents.ServerStats {
return agents.ServerStats{}
}
46 changes: 46 additions & 0 deletions server/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
validatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_validated",
Help: "Number of messages that were received and validated succesfully",
}, []string{"identity"})

unvalidatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_unvalidated",
Help: "Number of messages that were received but did not pass validation",
}, []string{"identity"})

passedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_passed",
Help: "Number of messages where this instance matched the filter expression",
}, []string{"identity"})

filteredCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_filtered",
Help: "Number of messages where this instance did not match the filter expression",
}, []string{"identity"})

repliesCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_replies",
Help: "Number of reply messages that were produced",
}, []string{"identity"})

ttlExpiredCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_ttlexpired",
Help: "Number of messages received that were too old and dropped",
}, []string{"identity"})
)

func init() {
prometheus.MustRegister(validatedCtr)
prometheus.MustRegister(unvalidatedCtr)
prometheus.MustRegister(passedCtr)
prometheus.MustRegister(filteredCtr)
prometheus.MustRegister(repliesCtr)
prometheus.MustRegister(ttlExpiredCtr)
}

0 comments on commit 2213ca8

Please sign in to comment.