Skip to content

Commit

Permalink
Merge pull request #760 from csfldf/dev/net_metric
Browse files Browse the repository at this point in the history
feat(metric): add NIC speed, tx_bps, rx_bps metrics
  • Loading branch information
xu282934741 authored Jan 14, 2025
2 parents 356adb7 + 1afa6a7 commit 0c24ce6
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 17 deletions.
4 changes: 4 additions & 0 deletions pkg/consts/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ const (
MetricNetTcpRetranSegs = "net.tcp.retrans_segs"
MetricNetTcpRecvPackets = "net.tcp.out_segs"
MetricNetTcpCloseWait = "net.tcp.close_wait"
MetricNetUpdateTime = "net.updatetime"
)

// System network metrics
const (
MetricNetReceiveBytes = "net.tcp.receive_bytes"
MetricNetReceiveBPS = "net.receive.bps"
MetricNetReceivePackets = "net.tcp.receive_packets"
MetricNetReceiveErrs = "net.tcp.receive_errs"
MetricNetReceiveDrops = "net.tcp.receive_drop"
Expand All @@ -112,13 +114,15 @@ const (
MetricNetReceiveCompressed = "net.tcp.receive_compressed"
MetricNetTransmitMulticast = "net.tcp.receive_multicast"
MetricNetTransmitBytes = "net.tcp.transmit_bytes"
MetricNetTransmitBPS = "net.transmit.bps"
MetricNetTransmitPackets = "net.tcp.transmit_packets"
MetricNetTransmitErrs = "net.tcp.transmit_errs"
MetricNetTransmitDrops = "net.tcp.transmit_drop"
MetricNetTransmitFIFO = "net.tcp.transmit_fifo"
MetricNetTransmitColls = "net.tcp.transmit_colls"
MetricNetTransmitCarrier = "net.tcp.transmit_carrier"
MetricNetTransmitCompressed = "net.tcp.transmit_compressed"
MetricNetSpeed = "net.speed"
)

// Node filesystem metrics
Expand Down
4 changes: 4 additions & 0 deletions pkg/metaserver/agent/metric/fake_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (f *FakeMetricsFetcher) GetNumaMetric(numaID int, metricName string) (metri
return f.checkMetricDataExpire(f.metricStore.GetNumaMetric(numaID, metricName))
}

func (f *FakeMetricsFetcher) GetNetworkMetric(networkName string, metricName string) (metric.MetricData, error) {
return f.checkMetricDataExpire(f.metricStore.GetDeviceMetric(networkName, metricName))
}

func (f *FakeMetricsFetcher) GetDeviceMetric(deviceName string, metricName string) (metric.MetricData, error) {
return f.checkMetricDataExpire(f.metricStore.GetDeviceMetric(deviceName, metricName))
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/metaserver/agent/metric/metric_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func (f *MetricsFetcherImpl) GetDeviceMetric(deviceName string, metricName strin
return f.checkMetricDataExpire(f.metricStore.GetDeviceMetric(deviceName, metricName))
}

func (f *MetricsFetcherImpl) GetNetworkMetric(networkName string, metricName string) (utilmetric.MetricData, error) {
return f.checkMetricDataExpire(f.metricStore.GetNetworkMetric(networkName, metricName))
}

func (f *MetricsFetcherImpl) GetCPUMetric(coreID int, metricName string) (utilmetric.MetricData, error) {
return f.checkMetricDataExpire(f.metricStore.GetCPUMetric(coreID, metricName))
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/metaserver/agent/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func Test_notifySystem(t *testing.T) {
now := time.Now()
m.metricStore.SetNodeMetric("test-node-metric", metric.MetricData{Value: 34, Time: &now})
m.metricStore.SetNumaMetric(1, "test-numa-metric", metric.MetricData{Value: 56, Time: &now})
m.metricStore.SetNetworkMetric("eth0", "test-net-metric", metric.MetricData{Value: 56, Time: &now})
m.metricStore.SetCPUMetric(2, "test-cpu-metric", metric.MetricData{Value: 78, Time: &now})
m.metricStore.SetDeviceMetric("test-device", "test-device-metric", metric.MetricData{Value: 91, Time: &now})
m.metricStore.SetContainerMetric("test-pod", "test-container", "test-container-metric", metric.MetricData{Value: 91, Time: &now})
Expand Down Expand Up @@ -152,6 +153,12 @@ func Test_notifySystem(t *testing.T) {
}
}
assert.Equal(t, 8, totalNotification)
_, err := f.GetNetworkMetric("eth0", "test-net-metric")
assert.Nil(t, err)

ff := NewFakeMetricsFetcher(metrics.DummyMetrics{})
_, err = ff.GetNetworkMetric("eth0", "test-net-metric")
assert.NotNil(t, err)
}

func TestStore_Aggregate(t *testing.T) {
Expand Down
58 changes: 58 additions & 0 deletions pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package malachite

import (
"context"
"fmt"
"math"
"strconv"
"strings"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/cgroup/common"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

Expand Down Expand Up @@ -412,6 +414,7 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
if systemNetData == nil {
return
}

// todo, currently we only get a unified data for the whole system io data
updateTime := time.Unix(systemNetData.UpdateTime, 0)

Expand All @@ -433,6 +436,8 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
utilmetric.MetricData{Value: float64(systemNetData.TCP.TCPOutSegs), Time: &updateTime})
m.metricStore.SetNodeMetric(consts.MetricNetTcpCloseWait,
utilmetric.MetricData{Value: float64(systemNetData.TCP.TCPCloseWait), Time: &updateTime})
m.metricStore.SetNodeMetric(consts.MetricNetUpdateTime,
utilmetric.MetricData{Value: float64(systemNetData.UpdateTime), Time: &updateTime})

for _, device := range systemNetData.NetworkCard {
// for now, we will only consider standard network interface
Expand All @@ -441,6 +446,14 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
continue
}

errs := []error{}
// setNetworkRateMetric will use metricStore.GetNetworkMetric to get previous round metric,
// we should call setNetworkRateMetric before calling SetNetworkMetric
errs = append(errs, m.setNetworkRateMetric(device.Name, consts.MetricNetTransmitBPS,
consts.MetricNetTransmitBytes, float64(device.TransmitBytes), &updateTime))
errs = append(errs, m.setNetworkRateMetric(device.Name, consts.MetricNetReceiveBPS,
consts.MetricNetReceiveBytes, float64(device.ReceiveBytes), &updateTime))

m.metricStore.SetNetworkMetric(device.Name, consts.MetricNetReceiveBytes,
utilmetric.MetricData{Value: float64(device.ReceiveBytes), Time: &updateTime})
m.metricStore.SetNetworkMetric(device.Name, consts.MetricNetReceivePackets,
Expand Down Expand Up @@ -474,7 +487,52 @@ func (m *MalachiteMetricsProvisioner) processSystemNetData(systemNetData *malach
m.metricStore.SetNetworkMetric(device.Name, consts.MetricNetTransmitCompressed,
utilmetric.MetricData{Value: float64(device.TransmitCompressed), Time: &updateTime})

if device.Speeds != nil {
m.metricStore.SetNetworkMetric(device.Name, consts.MetricNetSpeed,
utilmetric.MetricData{Value: float64(*device.Speeds), Time: &updateTime})
}

aggErrs := errors.NewAggregate(errs)

if aggErrs != nil {
general.Warningf("set network metrics for: %s got errors: %s", device.Name, aggErrs.Error())
}
}
}

func (m *MalachiteMetricsProvisioner) setNetworkRateMetric(deviceName,
rateMetricName, valueMetricName string,
curValue float64,
curUpdateTime *time.Time,
) error {
lastMetric, err := m.metricStore.GetNetworkMetric(deviceName, valueMetricName)
if err != nil {
return fmt.Errorf("get value metric: %s for %s failed with err: %v",
valueMetricName, rateMetricName, err)
}

lastValue := lastMetric.Value
lastUpdateTime := lastMetric.Time

if curUpdateTime == nil || lastUpdateTime == nil || curUpdateTime.Unix() == 0 || lastUpdateTime.Unix() == 0 {
return fmt.Errorf("nil curUpdateTime or lastUpdateTime for rateMetricName: %s", rateMetricName)
}

timeDeltaInSec := curUpdateTime.Sub(*lastUpdateTime).Seconds()

if timeDeltaInSec <= 0 {
return fmt.Errorf("invalid timeDelta: %.2f", timeDeltaInSec)
}

if (curValue > lastValue) && (curValue != 0) && (lastValue != 0) {
m.metricStore.SetNetworkMetric(deviceName, rateMetricName,
metric.MetricData{Value: (curValue - lastValue) / timeDeltaInSec, Time: curUpdateTime})
} else {
return fmt.Errorf("invalid curValue: %.2f, lastValue: %.2f for rateMetricName: %s",
curValue, lastValue, rateMetricName)
}

return nil
}

func (m *MalachiteMetricsProvisioner) processSystemNumaData(systemMemoryData *malachitetypes.SystemMemoryData, systemComputeData *malachitetypes.SystemComputeData) {
Expand Down
35 changes: 18 additions & 17 deletions pkg/metaserver/agent/metric/provisioner/malachite/types/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,24 @@ type SystemNetworkData struct {
}

type NetworkCard struct {
Name string `json:"name"`
ReceiveBytes uint64 `json:"receive_bytes"`
ReceivePackets uint64 `json:"receive_packets"`
ReceiveErrs uint64 `json:"receive_errs"`
ReceiveDrop uint64 `json:"receive_drop"`
ReceiveFifo uint64 `json:"receive_fifo"`
ReceiveFrame uint64 `json:"receive_frame"`
ReceiveCompressed uint64 `json:"receive_compressed"`
ReceiveMulticast uint64 `json:"receive_multicast"`
TransmitBytes uint64 `json:"transmit_bytes"`
TransmitPackets uint64 `json:"transmit_packets"`
TransmitErrs uint64 `json:"transmit_errs"`
TransmitDrop uint64 `json:"transmit_drop"`
TransmitFifo uint64 `json:"transmit_fifo"`
TransmitColls uint64 `json:"transmit_colls"`
TransmitCarrier uint64 `json:"transmit_carrier"`
TransmitCompressed uint64 `json:"transmit_compressed"`
Name string `json:"name"`
ReceiveBytes uint64 `json:"receive_bytes"`
ReceivePackets uint64 `json:"receive_packets"`
ReceiveErrs uint64 `json:"receive_errs"`
ReceiveDrop uint64 `json:"receive_drop"`
ReceiveFifo uint64 `json:"receive_fifo"`
ReceiveFrame uint64 `json:"receive_frame"`
ReceiveCompressed uint64 `json:"receive_compressed"`
ReceiveMulticast uint64 `json:"receive_multicast"`
TransmitBytes uint64 `json:"transmit_bytes"`
TransmitPackets uint64 `json:"transmit_packets"`
TransmitErrs uint64 `json:"transmit_errs"`
TransmitDrop uint64 `json:"transmit_drop"`
TransmitFifo uint64 `json:"transmit_fifo"`
TransmitColls uint64 `json:"transmit_colls"`
TransmitCarrier uint64 `json:"transmit_carrier"`
TransmitCompressed uint64 `json:"transmit_compressed"`
Speeds *uint64 `json:"speeds"`
}

type TCP struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/metaserver/agent/metric/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type MetricsReader interface {
GetNumaMetric(numaID int, metricName string) (metric.MetricData, error)
// GetDeviceMetric get metric of device.
GetDeviceMetric(deviceName string, metricName string) (metric.MetricData, error)
// GetDeviceMetric get metric of network.
GetNetworkMetric(networkName string, metricName string) (metric.MetricData, error)
// GetCPUMetric get metric of cpu.
GetCPUMetric(coreID int, metricName string) (metric.MetricData, error)
// GetContainerMetric get metric of container.
Expand Down

0 comments on commit 0c24ce6

Please sign in to comment.