Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dmq list #22

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 60 additions & 10 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"log"
"log/syslog"
"net"
"net/url"
"regexp"
Expand Down Expand Up @@ -124,6 +125,12 @@ type DispatcherTarget struct {
SetID int
}

// DMQNode is a peer for the dmq module
type DMQNode struct {
Host string
Status string
}

const (
namespace = "kamailio"
)
Expand All @@ -143,6 +150,7 @@ var (
"dispatcher.list",
"tls.info",
"dlg.stats_active",
"dmq.list_nodes",
}

metricsList = map[string][]Metric{
Expand Down Expand Up @@ -195,6 +203,9 @@ var (
NewMetricGauge("ongoing", "Dialogs ongoing.", "dlg.stats_active"),
NewMetricGauge("all", "Dialogs all.", "dlg.stats_active"),
},
"dmq.list_nodes": {
NewMetricGauge("dmq", "DMQ peer Status", "dmq.list_nodes"),
},
}
)

Expand Down Expand Up @@ -393,24 +404,22 @@ func (c *Collector) scrape(ch chan<- prometheus.Metric) error {

// scrapeMethod will return metrics for one method.
func (c *Collector) scrapeMethod(method string) (map[string][]MetricValue, error) {
records, err := c.fetchBINRPC(method)

records, err := c.fetchBINRPC(method) // returns []binrpc.Record
if err != nil {
return nil, err
}

// we expect just 1 record of type map
if len(records) == 2 && records[0].Type == binrpc.TypeInt && records[0].Value.(int) == 500 {
return nil, fmt.Errorf(`invalid response for method "%s": [500] %s`, method, records[1].Value.(string))
}
if method =="dmq.list_nodes" {
// This will contain multiple elements.
} else if len(records) != 1 {
return nil, fmt.Errorf(`invalid response for method "%s", expected %d record, got %d`,
method, 1, len(records),
return nil, fmt.Errorf(`invalid response for method "%s", expected %d record, got %d`,
method, 1, len(records),
)
}

// all methods implemented in this exporter return a struct
items, err := records[0].StructItems()

if err != nil {
return nil, err
}
Expand All @@ -423,7 +432,6 @@ func (c *Collector) scrapeMethod(method string) (map[string][]MetricValue, error
case "tm.stats":
for _, item := range items {
i, _ := item.Value.Int()

if codeRegex.MatchString(item.Key) {
// this item is a "code" statistic, eg "200" or "6xx"
metrics["codes"] = append(metrics["codes"],
Expand All @@ -446,6 +454,24 @@ func (c *Collector) scrapeMethod(method string) (map[string][]MetricValue, error
fallthrough
case "dlg.stats_active":
fallthrough
case "dmq.list_nodes":
nodes, err := parseDMQNodes(records)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
break
}
for _, node := range nodes {
mv := MetricValue{
Value: 1,
Labels: map[string]string{
"host": node.Host,
"status": node.Status,
},
}
metrics["dmq"] = append(metrics["dmq"], mv)
}
case "core.uptime":
for _, item := range items {
i, _ := item.Value.Int()
Expand Down Expand Up @@ -565,6 +591,29 @@ func parseDispatcherTargets(items []binrpc.StructItem) ([]DispatcherTarget, erro
return result, nil
}


// parseDMQNodes parses the "dmq.list_nodes" result and returns a list of nodes
func parseDMQNodes(records []binrpc.Record) ([]DMQNode, error) {
var result []DMQNode
for _, record := range records{
items, err := record.StructItems()
node := DMQNode{}
for _, item := range items {
switch item.Key {
case "host":
node.Host, _ = item.Value.String()
case "status":
node.Status, _ = item.Value.String()
}
}
if err != nil {
return nil, err
}
result = append(result,node)
}
return result, nil
}

// fetchBINRPC talks to kamailio using the BINRPC protocol.
func (c *Collector) fetchBINRPC(method string) ([]binrpc.Record, error) {
// WritePacket returns the cookie generated
Expand All @@ -576,7 +625,8 @@ func (c *Collector) fetchBINRPC(method string) ([]binrpc.Record, error) {

// the cookie is passed again for verification
// we receive records in response
records, err := binrpc.ReadPacket(c.conn, cookie)

records, err := binrpc.ReadPacket(c.conn, cookie)

if err != nil {
return nil, err
Expand Down