Skip to content

Commit

Permalink
(#146, #148) Record server stats and check TTLs
Browse files Browse the repository at this point in the history
Primarily this records a number of server stats in line with what
mcollective recorded so the rpcutil agent can be more compatible

In the process it was observed that TTLs are not checked, they are
now checked too
  • Loading branch information
ripienaar committed Jan 20, 2018
1 parent 1af974f commit e1cbb2f
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ ping
choria-*-*-*
*.rpm
*.tgz
debug
34 changes: 21 additions & 13 deletions agents/rpcutil/rpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ type DaemonStatsReply struct {
Agents []string `json:"agents"`
PID int `json:"pid"`
Times CPUTimes `json:"times"`
Validated int `json:"validated"`
Unvalidated int `json:"unvalidated"`
Passed int `json:"passed"`
Filtered int `json:"filtered"`
StartTime int64 `json:"starttime"`
Total int `json:"total"`
Replies int `json:"replies"`
ConfigFile string `json:"configfile"`
Version string `json:"version"`
TTLExpired int `json:"ttlexpired"`
Validated float64 `json:"validated"`
Unvalidated float64 `json:"unvalidated"`
Passed float64 `json:"passed"`
Filtered float64 `json:"filtered"`
Replies float64 `json:"replies"`
TTLExpired float64 `json:"ttlexpired"`
}

// New creates a new rpcutil agent
Expand Down Expand Up @@ -117,14 +117,22 @@ func New(mgr *agents.Manager) (*mcorpc.Agent, error) {
}

func daemonStatsAction(ctx context.Context, req *mcorpc.Request, reply *mcorpc.Reply, agent *mcorpc.Agent, conn choria.ConnectorInfo) {
stats := agent.ServerInfoSource.Stats()

output := &DaemonStatsReply{
Procs: []string{fmt.Sprintf("Go %s with %d go procs on %d cores", runtime.Version(), runtime.NumGoroutine(), runtime.NumCPU())},
Agents: agent.ServerInfoSource.KnownAgents(),
PID: os.Getpid(),
Times: CPUTimes{},
ConfigFile: agent.ServerInfoSource.ConfigFile(),
Version: build.Version,
StartTime: agent.ServerInfoSource.StartTime().Unix(),
Procs: []string{fmt.Sprintf("Go %s with %d go procs on %d cores", runtime.Version(), runtime.NumGoroutine(), runtime.NumCPU())},
Agents: agent.ServerInfoSource.KnownAgents(),
PID: os.Getpid(),
Times: CPUTimes{},
ConfigFile: agent.ServerInfoSource.ConfigFile(),
Version: build.Version,
StartTime: agent.ServerInfoSource.StartTime().Unix(),
Validated: stats.Valid,
Unvalidated: stats.Invalid,
Passed: stats.Passed,
Filtered: stats.Filtered,
Replies: stats.Replies,
TTLExpired: stats.TTLExpired,
}
reply.Data = output
}
Expand Down
19 changes: 13 additions & 6 deletions choria/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"time"

"github.com/choria-io/go-protocol/protocol"
)
Expand All @@ -13,9 +14,10 @@ type Message struct {
Payload string
Agent string

Request *Message
Filter *protocol.Filter
TTL int
Request *Message
Filter *protocol.Filter
TTL int
TimeStamp time.Time

SenderID string
CallerID string
Expand Down Expand Up @@ -52,6 +54,7 @@ func NewMessageFromRequest(req protocol.Request, replyto string, choria *Framewo

msg.RequestID = req.RequestID()
msg.TTL = req.TTL()
msg.TimeStamp = req.Time()
msg.Filter, _ = req.Filter()
msg.SenderID = choria.Config.Identity
msg.SetBase64Payload(req.Message())
Expand Down Expand Up @@ -96,9 +99,6 @@ func NewMessage(payload string, agent string, collective string, msgType string,
}

_, err = msg.Validate()
if err != nil {
return
}

return
}
Expand Down Expand Up @@ -170,6 +170,13 @@ func (self *Message) Validate() (bool, error) {
return true, nil
}

// ValidateTTL validates the message age
func (self *Message) ValidateTTL() bool {
earliest := time.Now().Add(-1 * time.Duration(self.TTL))

return self.TimeStamp.Before(earliest)
}

// SetBase64Payload sets the payload for the message, use it if the payload is Base64 encoded
func (self *Message) SetBase64Payload(payload string) error {
str, err := base64.StdEncoding.DecodeString(payload)
Expand Down
10 changes: 10 additions & 0 deletions server/agents/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ type ServerInfoSource interface {
Classes() []string
Facts() json.RawMessage
StartTime() time.Time
Stats() ServerStats
}

type ServerStats struct {
Valid float64
Invalid float64
Passed float64
Filtered float64
Replies float64
TTLExpired float64
}

type AgentReply struct {
Expand Down
4 changes: 4 additions & 0 deletions server/agents/agents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (si *stubsi) StartTime() time.Time {
return time.Now()
}

func (si *stubsi) Stats() ServerStats {
return ServerStats{}
}

func TestFileContent(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Server/Agents")
Expand Down
28 changes: 28 additions & 0 deletions server/infosource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"github.com/choria-io/go-choria/server/agents"
"github.com/choria-io/go-choria/server/discovery/classes"
"github.com/choria-io/go-choria/server/discovery/facts"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// KnownAgents is a list of agents loaded into the server instance
Expand Down Expand Up @@ -50,3 +53,28 @@ func (srv *Instance) Facts() json.RawMessage {
func (srv *Instance) StartTime() time.Time {
return srv.startTime
}

func (srv *Instance) Stats() agents.ServerStats {
return agents.ServerStats{
Valid: srv.getPromCtrValue(validatedCtr),
Invalid: srv.getPromCtrValue(unvalidatedCtr),
Passed: srv.getPromCtrValue(passedCtr),
Filtered: srv.getPromCtrValue(filteredCtr),
Replies: srv.getPromCtrValue(repliesCtr),
TTLExpired: srv.getPromCtrValue(ttlExpiredCtr),
}
}

func (srv *Instance) getPromCtrValue(ctr *prometheus.CounterVec) float64 {
pb := &dto.Metric{}
m, err := ctr.GetMetricWithLabelValues(srv.cfg.Identity)
if err != nil {
return 0
}

if m.Write(pb) != nil {
return 0
}

return pb.GetCounter().GetValue()
}
28 changes: 21 additions & 7 deletions server/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,52 @@ func (srv *Instance) handleRawMessage(ctx context.Context, wg *sync.WaitGroup, r

transport, err := srv.fw.NewTransportFromJSON(string(rawmsg.Data))
if err != nil {
srv.log.Errorf("Could not deceode message into transport: %s", err.Error())
srv.log.Errorf("Could not deceode message into transport: %s", err)
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
return
}

sreq, err := srv.fw.NewSecureRequestFromTransport(transport, false)
if err != nil {
srv.log.Errorf("Could not decode incoming request: %s", err.Error())
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not decode incoming request: %s", err)
return
}

req, err := srv.fw.NewRequestFromSecureRequest(sreq)
if err != nil {
srv.log.Errorf("Could not decode secure request: %s", err.Error())
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not decode secure request: %s", err)
return
}

protocol.CopyFederationData(transport, req)

if !srv.discovery.ShouldProcess(req, srv.agents.KnownAgents()) {
filteredCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Debugf("Skipping message %s that does not match local properties", req.RequestID())
return
}

passedCtr.WithLabelValues(srv.cfg.Identity).Inc()

msg, err = choria.NewMessageFromRequest(req, transport.ReplyTo(), srv.fw)
if err != nil {
srv.log.Errorf("Could not create Message: %s", err.Error())
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not create Message: %s", err)
return
}

if !msg.ValidateTTL() {
ttlExpiredCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Message %s created at %s is too old, TTL is %d", msg.String(), msg.TimeStamp, msg.TTL)
return
}

validatedCtr.WithLabelValues(srv.cfg.Identity).Inc()

wg.Add(1)
go srv.agents.Dispatch(ctx, wg, replies, msg, req)

}

func (srv *Instance) handleReply(reply *agents.AgentReply) {
Expand All @@ -57,15 +71,15 @@ func (srv *Instance) handleReply(reply *agents.AgentReply) {

msg, err := choria.NewMessageFromRequest(reply.Request, reply.Message.ReplyTo(), srv.fw)
if err != nil {
srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err.Error())
srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err)
return
}

msg.Payload = string(reply.Body)

err = srv.connector.Publish(msg)
if err != nil {
srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err.Error())
srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err)
}

}
Expand Down
4 changes: 4 additions & 0 deletions server/serverinfotest/stubsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ func (si *InfoSource) Facts() json.RawMessage {
func (si *InfoSource) StartTime() time.Time {
return time.Now()
}

func (si *InfoSource) Stats() agents.ServerStats {
return agents.ServerStats{}
}
46 changes: 46 additions & 0 deletions server/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package server

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
validatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_validated",
Help: "Number of messages that were received and validated succesfully",
}, []string{"identity"})

unvalidatedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_unvalidated",
Help: "Number of messages that were received but did not pass validation",
}, []string{"identity"})

passedCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_passed",
Help: "Number of messages where this instance matched the filter expression",
}, []string{"identity"})

filteredCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_filtered",
Help: "Number of messages where this instance did not match the filter expression",
}, []string{"identity"})

repliesCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_replies",
Help: "Number of reply messages that were produced",
}, []string{"identity"})

ttlExpiredCtr = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "choria_server_ttlexpired",
Help: "Number of messages received that were too old and dropped",
}, []string{"identity"})
)

func init() {
prometheus.MustRegister(validatedCtr)
prometheus.MustRegister(unvalidatedCtr)
prometheus.MustRegister(passedCtr)
prometheus.MustRegister(filteredCtr)
prometheus.MustRegister(repliesCtr)
prometheus.MustRegister(ttlExpiredCtr)
}

0 comments on commit e1cbb2f

Please sign in to comment.