From dbb7e0bac6fec2797bb05b57405ca3f8ab25c36a Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Mon, 24 Feb 2025 19:35:11 +0100 Subject: [PATCH] nvidia: Add power consumption and memory utilization metrics To make things more ergonomic I've refactored the existing code. We could probably refactor even more, but I felt that's good enough for now. --- nvidia.go | 217 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 164 insertions(+), 53 deletions(-) diff --git a/nvidia.go b/nvidia.go index bf9a5ad..2aa43d6 100644 --- a/nvidia.go +++ b/nvidia.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/binary" + "errors" "fmt" "log/slog" "sort" @@ -12,6 +13,14 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) +const ( + attributeIndex = "index" + attributeUUID = "uuid" + metricNameGPUMemoryUtilizationPercent = "gpu_utilization_memory_percent" + metricNameGPUUtilizationPercent = "gpu_utilization_percent" + metricNameGPUPowerWatt = "gpu_power_watt" +) + type perDeviceState struct { d nvml.Device lastTimestamp uint64 @@ -21,26 +30,20 @@ type producer struct { devices []perDeviceState } -type byTs []nvml.Sample - -func (a byTs) Len() int { return len(a) } -func (a byTs) Less(i, j int) bool { return a[i].TimeStamp < a[j].TimeStamp } -func (a byTs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - func NewNvidiaProducer() (*producer, error) { ret := nvml.Init() if ret != nvml.SUCCESS { - return nil, fmt.Errorf("Failed to initialize NVML library: %v", nvml.ErrorString(ret)) + return nil, fmt.Errorf("failed to initialize NVML library: %v", nvml.ErrorString(ret)) } count, ret := nvml.DeviceGetCount() if ret != nvml.SUCCESS { - return nil, fmt.Errorf("Failed to get count of Nvidia devices: %v", nvml.ErrorString(ret)) + return nil, fmt.Errorf("failed to get count of Nvidia devices: %v", nvml.ErrorString(ret)) } devices := make([]perDeviceState, count) for i := 0; i < count; i++ { device, ret := nvml.DeviceGetHandleByIndex(i) if ret != nvml.SUCCESS { - return nil, fmt.Errorf("Failed to get handle for Nvidia device %d: %v", i, nvml.ErrorString(ret)) + return nil, fmt.Errorf("failed to get handle for Nvidia device %d: %v", i, nvml.ErrorString(ret)) } devices[i] = perDeviceState{ d: device, @@ -56,64 +59,172 @@ func (p *producer) Produce(ms pmetric.MetricSlice) error { for i, pds := range p.devices { uuid, ret := pds.d.GetUUID() if ret != nvml.SUCCESS { - slog.Error("Failed to get device UUID", "index", i, "error", nvml.ErrorString(ret)) + slog.Error("Failed to get device uuid", "index", i, "error", nvml.ErrorString(ret)) continue } slog.Debug("Collecting metrics for device", "uuid", uuid, "index", i) - m := ms.AppendEmpty() - g := m.SetEmptyGauge() + err := p.produceUtilization(pds, uuid, i, ms) + if err != nil { + slog.Error("Failed to get GPU utilization for device", "uuid", uuid, "index", i, "error", err) + continue + } - valueType, utilSamps, ret := pds.d.GetSamples(nvml.GPU_UTILIZATION_SAMPLES, pds.lastTimestamp) - if ret != nvml.SUCCESS { - slog.Error("Failed to get GPU utilization for device", "uuid", uuid, "index", i) + err = p.produceMemoryUtilization(pds, uuid, i, ms) + if err != nil { + slog.Error("Failed to get GPU memory utilization for device", "uuid", uuid, "index", i, "error", err) continue } - var setVal func(pmetric.NumberDataPoint, [8]byte) - switch valueType { - case nvml.VALUE_TYPE_DOUBLE: - setVal = func(dp pmetric.NumberDataPoint, val [8]byte) { - var value float64 - // TODO - test this on a big-endian machine - err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value) - if err != nil { - // justification for panic: this can never happen unless we've made - // a programming error. - panic(err) - } - dp.SetDoubleValue(value) - } - case nvml.VALUE_TYPE_UNSIGNED_INT, nvml.VALUE_TYPE_UNSIGNED_LONG, nvml.VALUE_TYPE_UNSIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_INT, nvml.VALUE_TYPE_COUNT: - setVal = func(dp pmetric.NumberDataPoint, val [8]byte) { - var value int64 - // TODO - test this on a big-endian machine - err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value) - if err != nil { - // justification for panic: this can never happen unless we've made - // a programming error. - panic(err) - } - dp.SetIntValue(value) - } - default: - slog.Error("Unknown value data type in GPU metrics", "type", valueType) + + err = p.producePowerConsumption(pds, uuid, i, ms) + if err != nil { + slog.Error("Failed to get GPU memory utilization for device", "uuid", uuid, "index", i, "error", err) continue } + } + + return nil +} - sort.Sort(byTs(utilSamps)) +func (p *producer) produceUtilization(pds perDeviceState, uuid string, index int, ms pmetric.MetricSlice) error { + m := ms.AppendEmpty() + g := m.SetEmptyGauge() + m.SetName(metricNameGPUUtilizationPercent) - for _, samp := range utilSamps { - pds.lastTimestamp = max(pds.lastTimestamp, samp.TimeStamp) + sampleType, samples, ret := pds.d.GetSamples(nvml.GPU_UTILIZATION_SAMPLES, pds.lastTimestamp) + if !errors.Is(ret, nvml.SUCCESS) { + return ret + } + getValue, err := valueGetter(sampleType) + if err != nil { + return err + } + + sort.Slice(samples, func(i, j int) bool { + return samples[i].TimeStamp < samples[j].TimeStamp + }) + + for _, s := range samples { + pds.lastTimestamp = max(pds.lastTimestamp, s.TimeStamp) + + value := getValue(s.SampleValue).(int64) + + dp := g.DataPoints().AppendEmpty() + dp.Attributes().PutStr(attributeUUID, uuid) + dp.Attributes().PutInt(attributeIndex, int64(index)) + dp.SetTimestamp(pcommon.Timestamp(s.TimeStamp * 1000)) // micros to nanos + dp.SetIntValue(value) + } + + return nil +} + +func (p *producer) produceMemoryUtilization(pds perDeviceState, uuid string, index int, ms pmetric.MetricSlice) error { + m := ms.AppendEmpty() + g := m.SetEmptyGauge() + m.SetName(metricNameGPUMemoryUtilizationPercent) + + sampleType, samples, ret := pds.d.GetSamples(nvml.MEMORY_UTILIZATION_SAMPLES, pds.lastTimestamp) + if !errors.Is(ret, nvml.SUCCESS) { + return ret + } + getValue, err := valueGetter(sampleType) + if err != nil { + return err + } - dp := g.DataPoints().AppendEmpty() - setVal(dp, samp.SampleValue) + sort.Slice(samples, func(i, j int) bool { + return samples[i].TimeStamp < samples[j].TimeStamp + }) - // samp.TimeStamp is micros since epoch; pcommon.Timestamp expects - // nanos since epoch - dp.SetTimestamp(pcommon.Timestamp(samp.TimeStamp * 1000)) - dp.Attributes().PutStr("UUID", uuid) - dp.Attributes().PutInt("index", int64(i)) + for _, s := range samples { + if s.TimeStamp == 0 { + continue } + + pds.lastTimestamp = max(pds.lastTimestamp, s.TimeStamp) + + value := getValue(s.SampleValue).(int64) + dp := g.DataPoints().AppendEmpty() + dp.Attributes().PutStr(attributeUUID, uuid) + dp.Attributes().PutInt(attributeIndex, int64(index)) + dp.SetTimestamp(pcommon.Timestamp(s.TimeStamp * 1000)) // micros to nanos + dp.SetIntValue(value) } + return nil } + +func (p *producer) producePowerConsumption(pds perDeviceState, uuid string, index int, ms pmetric.MetricSlice) error { + m := ms.AppendEmpty() + g := m.SetEmptyGauge() + m.SetName(metricNameGPUPowerWatt) + + sampleType, samples, ret := pds.d.GetSamples(nvml.TOTAL_POWER_SAMPLES, pds.lastTimestamp) + if !errors.Is(ret, nvml.SUCCESS) { + return ret + } + getValue, err := valueGetter(sampleType) + if err != nil { + return err + } + + sort.Slice(samples, func(i, j int) bool { + return samples[i].TimeStamp < samples[j].TimeStamp + }) + + for _, s := range samples { + if s.TimeStamp == 0 { + continue + } + value := getValue(s.SampleValue).(int64) + if value > 10000*1000 { // ignore if above 10k watt + continue + } + if value < 0 { // ignore negative power consumption + continue + } + + pds.lastTimestamp = max(pds.lastTimestamp, s.TimeStamp) + + dp := g.DataPoints().AppendEmpty() + dp.Attributes().PutStr(attributeUUID, uuid) + dp.Attributes().PutInt(attributeIndex, int64(index)) + dp.SetTimestamp(pcommon.Timestamp(s.TimeStamp * 1000)) // micros to nanos + dp.SetIntValue(value) + } + + return nil +} + +func valueGetter(sampleType nvml.ValueType) (func([8]byte) any, error) { + switch sampleType { + case nvml.VALUE_TYPE_DOUBLE: + return func(val [8]byte) any { + var value float64 + // TODO - test this on a big-endian machine + err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value) + if err != nil { + // justification for panic: this can never happen unless we've made + // a programming error. + panic(err) + } + return value + // dp.SetDoubleValue(value) + }, nil + case nvml.VALUE_TYPE_UNSIGNED_INT, nvml.VALUE_TYPE_UNSIGNED_LONG, nvml.VALUE_TYPE_UNSIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_INT, nvml.VALUE_TYPE_COUNT: + return func(val [8]byte) any { + var value int64 + // TODO - test this on a big-endian machine + err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value) + if err != nil { + // justification for panic: this can never happen unless we've made + // a programming error. + panic(err) + } + return value + }, nil + default: + return nil, fmt.Errorf("unsupported sample type %v", sampleType) + } +}