Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nvidia: Add power consumption and memory utilization metrics #11

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 164 additions & 53 deletions nvidia.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log/slog"
"sort"
Expand All @@ -12,6 +13,14 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

const (
attributeIndex = "index"
attributeUUID = "uuid"
metricNameGPUMemoryUtilizationPercent = "gpu_utilization_memory_percent"
metricNameGPUUtilizationPercent = "gpu_utilization_percent"
metricNameGPUPowerWatt = "gpu_power_watt"
)

type perDeviceState struct {
d nvml.Device
lastTimestamp uint64
Expand All @@ -21,26 +30,20 @@ type producer struct {
devices []perDeviceState
}

type byTs []nvml.Sample

func (a byTs) Len() int { return len(a) }
func (a byTs) Less(i, j int) bool { return a[i].TimeStamp < a[j].TimeStamp }
func (a byTs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

func NewNvidiaProducer() (*producer, error) {
ret := nvml.Init()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("Failed to initialize NVML library: %v", nvml.ErrorString(ret))
return nil, fmt.Errorf("failed to initialize NVML library: %v", nvml.ErrorString(ret))
}
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("Failed to get count of Nvidia devices: %v", nvml.ErrorString(ret))
return nil, fmt.Errorf("failed to get count of Nvidia devices: %v", nvml.ErrorString(ret))
}
devices := make([]perDeviceState, count)
for i := 0; i < count; i++ {
device, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("Failed to get handle for Nvidia device %d: %v", i, nvml.ErrorString(ret))
return nil, fmt.Errorf("failed to get handle for Nvidia device %d: %v", i, nvml.ErrorString(ret))
}
devices[i] = perDeviceState{
d: device,
Expand All @@ -56,64 +59,172 @@ func (p *producer) Produce(ms pmetric.MetricSlice) error {
for i, pds := range p.devices {
uuid, ret := pds.d.GetUUID()
if ret != nvml.SUCCESS {
slog.Error("Failed to get device UUID", "index", i, "error", nvml.ErrorString(ret))
slog.Error("Failed to get device uuid", "index", i, "error", nvml.ErrorString(ret))
continue
}
slog.Debug("Collecting metrics for device", "uuid", uuid, "index", i)

m := ms.AppendEmpty()
g := m.SetEmptyGauge()
err := p.produceUtilization(pds, uuid, i, ms)
if err != nil {
slog.Error("Failed to get GPU utilization for device", "uuid", uuid, "index", i, "error", err)
continue
}

valueType, utilSamps, ret := pds.d.GetSamples(nvml.GPU_UTILIZATION_SAMPLES, pds.lastTimestamp)
if ret != nvml.SUCCESS {
slog.Error("Failed to get GPU utilization for device", "uuid", uuid, "index", i)
err = p.produceMemoryUtilization(pds, uuid, i, ms)
if err != nil {
slog.Error("Failed to get GPU memory utilization for device", "uuid", uuid, "index", i, "error", err)
continue
}
var setVal func(pmetric.NumberDataPoint, [8]byte)
switch valueType {
case nvml.VALUE_TYPE_DOUBLE:
setVal = func(dp pmetric.NumberDataPoint, val [8]byte) {
var value float64
// TODO - test this on a big-endian machine
err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value)
if err != nil {
// justification for panic: this can never happen unless we've made
// a programming error.
panic(err)
}
dp.SetDoubleValue(value)
}
case nvml.VALUE_TYPE_UNSIGNED_INT, nvml.VALUE_TYPE_UNSIGNED_LONG, nvml.VALUE_TYPE_UNSIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_INT, nvml.VALUE_TYPE_COUNT:
setVal = func(dp pmetric.NumberDataPoint, val [8]byte) {
var value int64
// TODO - test this on a big-endian machine
err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value)
if err != nil {
// justification for panic: this can never happen unless we've made
// a programming error.
panic(err)
}
dp.SetIntValue(value)
}
default:
slog.Error("Unknown value data type in GPU metrics", "type", valueType)

err = p.producePowerConsumption(pds, uuid, i, ms)
if err != nil {
slog.Error("Failed to get GPU memory utilization for device", "uuid", uuid, "index", i, "error", err)
continue
}
}

return nil
}

sort.Sort(byTs(utilSamps))
func (p *producer) produceUtilization(pds perDeviceState, uuid string, index int, ms pmetric.MetricSlice) error {
m := ms.AppendEmpty()
g := m.SetEmptyGauge()
m.SetName(metricNameGPUUtilizationPercent)

for _, samp := range utilSamps {
pds.lastTimestamp = max(pds.lastTimestamp, samp.TimeStamp)
sampleType, samples, ret := pds.d.GetSamples(nvml.GPU_UTILIZATION_SAMPLES, pds.lastTimestamp)
if !errors.Is(ret, nvml.SUCCESS) {
return ret
}
getValue, err := valueGetter(sampleType)
if err != nil {
return err
}

sort.Slice(samples, func(i, j int) bool {
return samples[i].TimeStamp < samples[j].TimeStamp
})

for _, s := range samples {
pds.lastTimestamp = max(pds.lastTimestamp, s.TimeStamp)

value := getValue(s.SampleValue).(int64)

dp := g.DataPoints().AppendEmpty()
dp.Attributes().PutStr(attributeUUID, uuid)
dp.Attributes().PutInt(attributeIndex, int64(index))
dp.SetTimestamp(pcommon.Timestamp(s.TimeStamp * 1000)) // micros to nanos
dp.SetIntValue(value)
}

return nil
}

func (p *producer) produceMemoryUtilization(pds perDeviceState, uuid string, index int, ms pmetric.MetricSlice) error {
m := ms.AppendEmpty()
g := m.SetEmptyGauge()
m.SetName(metricNameGPUMemoryUtilizationPercent)

sampleType, samples, ret := pds.d.GetSamples(nvml.MEMORY_UTILIZATION_SAMPLES, pds.lastTimestamp)
if !errors.Is(ret, nvml.SUCCESS) {
return ret
}
getValue, err := valueGetter(sampleType)
if err != nil {
return err
}

dp := g.DataPoints().AppendEmpty()
setVal(dp, samp.SampleValue)
sort.Slice(samples, func(i, j int) bool {
return samples[i].TimeStamp < samples[j].TimeStamp
})

// samp.TimeStamp is micros since epoch; pcommon.Timestamp expects
// nanos since epoch
dp.SetTimestamp(pcommon.Timestamp(samp.TimeStamp * 1000))
dp.Attributes().PutStr("UUID", uuid)
dp.Attributes().PutInt("index", int64(i))
for _, s := range samples {
if s.TimeStamp == 0 {
continue
}

pds.lastTimestamp = max(pds.lastTimestamp, s.TimeStamp)

value := getValue(s.SampleValue).(int64)
dp := g.DataPoints().AppendEmpty()
dp.Attributes().PutStr(attributeUUID, uuid)
dp.Attributes().PutInt(attributeIndex, int64(index))
dp.SetTimestamp(pcommon.Timestamp(s.TimeStamp * 1000)) // micros to nanos
dp.SetIntValue(value)
}

return nil
}

func (p *producer) producePowerConsumption(pds perDeviceState, uuid string, index int, ms pmetric.MetricSlice) error {
m := ms.AppendEmpty()
g := m.SetEmptyGauge()
m.SetName(metricNameGPUPowerWatt)

sampleType, samples, ret := pds.d.GetSamples(nvml.TOTAL_POWER_SAMPLES, pds.lastTimestamp)
if !errors.Is(ret, nvml.SUCCESS) {
return ret
}
getValue, err := valueGetter(sampleType)
if err != nil {
return err
}

sort.Slice(samples, func(i, j int) bool {
return samples[i].TimeStamp < samples[j].TimeStamp
})

for _, s := range samples {
if s.TimeStamp == 0 {
continue
}
value := getValue(s.SampleValue).(int64)
if value > 10000*1000 { // ignore if above 10k watt
continue
}
if value < 0 { // ignore negative power consumption
continue
}

pds.lastTimestamp = max(pds.lastTimestamp, s.TimeStamp)

dp := g.DataPoints().AppendEmpty()
dp.Attributes().PutStr(attributeUUID, uuid)
dp.Attributes().PutInt(attributeIndex, int64(index))
dp.SetTimestamp(pcommon.Timestamp(s.TimeStamp * 1000)) // micros to nanos
dp.SetIntValue(value)
}

return nil
}

func valueGetter(sampleType nvml.ValueType) (func([8]byte) any, error) {
switch sampleType {
case nvml.VALUE_TYPE_DOUBLE:
return func(val [8]byte) any {
var value float64
// TODO - test this on a big-endian machine
err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value)
if err != nil {
// justification for panic: this can never happen unless we've made
// a programming error.
panic(err)
}
return value
// dp.SetDoubleValue(value)
}, nil
case nvml.VALUE_TYPE_UNSIGNED_INT, nvml.VALUE_TYPE_UNSIGNED_LONG, nvml.VALUE_TYPE_UNSIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_INT, nvml.VALUE_TYPE_COUNT:
return func(val [8]byte) any {
var value int64
// TODO - test this on a big-endian machine
err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value)
if err != nil {
// justification for panic: this can never happen unless we've made
// a programming error.
panic(err)
}
return value
}, nil
default:
return nil, fmt.Errorf("unsupported sample type %v", sampleType)
}
}