Skip to content

Commit

Permalink
Add tcpstat connection states per source/dest ports
Browse files Browse the repository at this point in the history
Signed-off-by: Tomáš Kadaně <[email protected]>
  • Loading branch information
Tomáš Kadaně committed Jun 25, 2024
1 parent 4f7bd35 commit 0722d10
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 6 deletions.
101 changes: 95 additions & 6 deletions collector/tcpstat_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package collector

import (
"encoding/binary"
"fmt"
"os"
"strconv"
"syscall"
"unsafe"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/mdlayher/netlink"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -58,9 +61,15 @@ const (
tcpTxQueuedBytes
)

var (
tcpstatSourcePorts = kingpin.Flag("collector.tcpstat.port.source", "List of tcpstat source ports").Strings()
tcpstatDestPorts = kingpin.Flag("collector.tcpstat.port.dest", "List of tcpstat destination ports").Strings()
)

type tcpStatCollector struct {
desc typedDesc
logger log.Logger
desc typedDesc
descPort typedDesc
logger log.Logger
}

func init() {
Expand All @@ -75,6 +84,11 @@ func NewTCPStatCollector(logger log.Logger) (Collector, error) {
"Number of connection states.",
[]string{"state"}, nil,
), prometheus.GaugeValue},
descPort: typedDesc{prometheus.NewDesc(
prometheus.BuildFQName(namespace, "tcp", "connection_states_port"),
"Number of connection states per port.",
[]string{"state", "port", "type"}, nil,
), prometheus.GaugeValue},
logger: logger,
}, nil
}
Expand Down Expand Up @@ -129,18 +143,28 @@ func parseInetDiagMsg(b []byte) *InetDiagMsg {
}

func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error {
tcpStats, err := getTCPStats(syscall.AF_INET)
messages, err := getMessagesFromSocket(syscall.AF_INET)
if err != nil {
return fmt.Errorf("couldn't get tcpstats: %w", err)
}

tcpStats, err := parseTCPStats(messages)
if err != nil {
return fmt.Errorf("couldn't parse tcpstats: %w", err)
}

// if enabled ipv6 system
if _, hasIPv6 := os.Stat(procFilePath("net/tcp6")); hasIPv6 == nil {
tcp6Stats, err := getTCPStats(syscall.AF_INET6)
messagesIPv6, err := getMessagesFromSocket(syscall.AF_INET6)
if err != nil {
return fmt.Errorf("couldn't get tcp6stats: %w", err)
}

tcp6Stats, err := parseTCPStats(messagesIPv6)
if err != nil {
return fmt.Errorf("couldn't parse tcp6stats: %w", err)
}

for st, value := range tcp6Stats {
tcpStats[st] += value
}
Expand All @@ -150,10 +174,28 @@ func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error {
ch <- c.desc.mustNewConstMetric(value, st.String())
}

statsPerSourcePorts, err := parseTCPStatsPerSourcePort(messages, *tcpstatSourcePorts)
if err != nil {
return fmt.Errorf("couldn't get tcpstats per source port: %w", err)
}

for statePort, value := range statsPerSourcePorts {
ch <- c.descPort.mustNewConstMetric(value, statePort.state.String(), strconv.Itoa(statePort.port), "source")
}

statsPerDestPorts, err := parseTCPStatsPerDestPort(messages, *tcpstatDestPorts)
if err != nil {
return fmt.Errorf("couldn't get tcpstats per dest port: %w", err)
}

for statePort, value := range statsPerDestPorts {
ch <- c.descPort.mustNewConstMetric(value, statePort.state.String(), strconv.Itoa(statePort.port), "dest")
}

return nil
}

func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) {
func getMessagesFromSocket(family uint8) ([]netlink.Message, error) {
const TCPFAll = 0xFFF
const InetDiagInfo = 2
const SockDiagByFamily = 20
Expand Down Expand Up @@ -182,7 +224,7 @@ func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) {
return nil, err
}

return parseTCPStats(messages)
return messages, nil
}

func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, error) {
Expand All @@ -199,6 +241,53 @@ func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, erro
return tcpStats, nil
}

type statePortPair struct {
state tcpConnectionState
port int
}

func parseTCPStatsPerSourcePort(msgs []netlink.Message, sourcePorts []string) (map[statePortPair]float64, error) {
tcpStatsPerStatePerPort := map[statePortPair]float64{}

for _, m := range msgs {
msg := parseInetDiagMsg(m.Data)

for _, sourcePort := range sourcePorts {
sourcePortInt := int(binary.BigEndian.Uint16(msg.ID.SourcePort[:]))

if sourcePort == strconv.Itoa(sourcePortInt) {
tcpStatsPerStatePerPort[statePortPair{
state: tcpConnectionState(msg.State),
port: sourcePortInt,
}]++
}
}
}

return tcpStatsPerStatePerPort, nil
}

func parseTCPStatsPerDestPort(msgs []netlink.Message, destPorts []string) (map[statePortPair]float64, error) {
tcpStatsPerStatePerPort := map[statePortPair]float64{}

for _, m := range msgs {
msg := parseInetDiagMsg(m.Data)

for _, destPort := range destPorts {
destPortInt := int(binary.BigEndian.Uint16(msg.ID.DestPort[:]))

if destPort == strconv.Itoa(destPortInt) {
tcpStatsPerStatePerPort[statePortPair{
state: tcpConnectionState(msg.State),
port: destPortInt,
}]++
}
}
}

return tcpStatsPerStatePerPort, nil
}

func (st tcpConnectionState) String() string {
switch st {
case tcpEstablished:
Expand Down
69 changes: 69 additions & 0 deletions collector/tcpstat_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,74 @@ func Test_parseTCPStats(t *testing.T) {
if want, got := 22, int(tcpStats[tcpRxQueuedBytes]); want != got {
t.Errorf("want tcpstat number of bytes in rx queue %d, got %d", want, got)
}
}

func Test_parseTCPStatsPerPort(t *testing.T) {
encode := func(m InetDiagMsg) []byte {
var buf bytes.Buffer
err := binary.Write(&buf, native.Endian, m)
if err != nil {
panic(err)
}
return buf.Bytes()
}

msg := []netlink.Message{
{
Data: encode(InetDiagMsg{
Family: syscall.AF_INET,
State: uint8(tcpEstablished),
ID: InetDiagSockID{
DestPort: [2]byte{0, 22},
},
}),
},
{
Data: encode(InetDiagMsg{
Family: syscall.AF_INET,
State: uint8(tcpEstablished),
ID: InetDiagSockID{
DestPort: [2]byte{0, 22},
SourcePort: [2]byte{0, 23},
},
}),
},
{
Data: encode(InetDiagMsg{
Family: syscall.AF_INET6,
State: uint8(tcpEstablished),
ID: InetDiagSockID{
SourcePort: [2]byte{0, 23},
},
}),
},
}

tcpStatsPerDestPort, err := parseTCPStatsPerDestPort(msg, []string{"22"})
if err != nil {
t.Fatal(err)
}

sp1 := statePortPair{
state: tcpEstablished,
port: 22,
}

sp2 := statePortPair{
state: tcpEstablished,
port: 23,
}

tcpStatsPerSourcePort, err := parseTCPStatsPerSourcePort(msg, []string{"23"})
if err != nil {
t.Fatal(err)
}

if want, got := 2, int(tcpStatsPerDestPort[sp1]); want != got {
t.Errorf("tcpstat connection per %s states per dest port %d. want %d, got %d", sp1.state.String(), sp1.port, want, got)
}

if want, got := 2, int(tcpStatsPerSourcePort[sp2]); want != got {
t.Errorf("tcpstat connection per %s states per source port %d. want %d, got %d", sp2.state.String(), sp2.port, want, got)
}
}

0 comments on commit 0722d10

Please sign in to comment.