Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: v1 support diagnostic packages #1928

Draft
wants to merge 4 commits into
base: release/v1.63.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0

require (
cel.dev/expr v0.16.0 // indirect
cloud.google.com/go/auth v0.9.8 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2client
github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.111.0/go.mod h1:66TKY0RWUw9GGZ8SQrr1CyXL17Z9eRvGfsY1WBkndMk=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/oidcauthextension v0.111.0 h1:XvmX2mHl4Y4qKhRGfGbhLhfLg/+gSpcXhO6WcgQe2JM=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/oidcauthextension v0.111.0/go.mod h1:I/Lk+XENTjQkO8ipfc12eA2LHxUc/zNJTJkvf64tnG4=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0 h1:cZVtc0BkL4iHOuUP28f+LuJiueUmd/MyYULt21YzQkY=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0/go.mod h1:wBrMy+mgk1llwg2zufyS6VNVZxavBQH97zJcxiDEWz8=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.111.0 h1:m/u2iRttl/nEjp0EZ9w371LLAqogw0tDn+wPU6D7ArY=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.111.0/go.mod h1:mmkCl306sRZYt/7uNmjvuRRvbe/xUDSDm8fEAGSSMKI=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.111.0 h1:9k20EsVLgDoVxnJjp9Hg4Eosj5E+A77D5lr01fGQk0A=
Expand Down
65 changes: 65 additions & 0 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package observiq

import (
"bytes"
"context"
"encoding/hex"
"errors"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)

var (
Expand Down Expand Up @@ -76,6 +78,8 @@ type Client struct {
collectorMntrWg sync.WaitGroup

currentConfig opamp.Config

supportPackageMux sync.Mutex
}

// NewClientArgs arguments passed when creating a new client
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) {
packagesStateProvider: newPackagesStateProvider(clientLogger, packagestate.DefaultFileName),
updaterManager: updaterManger,
reportManager: reportManager,
supportPackageMux: sync.Mutex{},
}

// Parse URL to determin scheme
Expand All @@ -149,6 +154,7 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) {
err = observiqClient.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: []string{
measurements.ReportMeasurementsV1Capability,
diagnosticsReportV1Capability,
},
})
if err != nil {
Expand Down Expand Up @@ -350,6 +356,15 @@ func (c *Client) onMessageFuncHandler(ctx context.Context, msg *types.MessageDat
c.measurementsSender.Stop()
}
}
if msg.CustomMessage != nil {
msgCapability := msg.CustomMessage.GetCapability()
msgType := msg.CustomMessage.GetType()

if msgCapability == diagnosticsReportV1Capability &&
msgType == diagnosticsRequestType {
go c.handleDiagnosticPackageRequest(msg.CustomMessage.GetData())
}
}
}

func (c *Client) onRemoteConfigHandler(ctx context.Context, remoteConfig *protobufs.AgentRemoteConfig) error {
Expand Down Expand Up @@ -726,3 +741,53 @@ func (c *Client) safeGetDisconnecting() bool {
defer c.mutex.Unlock()
return c.disconnecting
}

func (c *Client) handleDiagnosticPackageRequest(data []byte) {
// Only one diagnostic package should be generated at a time.
c.supportPackageMux.Lock()
defer c.supportPackageMux.Unlock()

var req diagnosticRequestCustomMessage
err := yaml.Unmarshal(data, &req)
if err != nil {
c.logger.Error("Failed to unmarshal diagnostic request.", zap.Error(err))
return
}

// TODO: Support streaming (don't need to read full log files into memory)
// We can do this with io.Pipe I think
buf := &bytes.Buffer{}
di, err := newDiagnosticInfo(c.ident.agentID, c.ident.version)
if err != nil {
c.logger.Error("Failed to create diagnostic info.", zap.Error(err))
return
}

err = writeSupportPackage(buf, di)
if err != nil {
c.logger.Error("Failed to create unmarshal request.", zap.Error(err))
return
}

httpReq, err := http.NewRequestWithContext(context.TODO(), "PUT", req.ReportURL, buf)
if err != nil {
c.logger.Error("Failed to create http request.", zap.Error(err))
return
}

for k, v := range req.Headers {
httpReq.Header.Add(k, v)
}

resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
c.logger.Error("Failed to PUT http request.", zap.Error(err))
return
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode > 299 {
c.logger.Error("Diagnostic PUT returned bad status code", zap.Int("status_code", resp.StatusCode))
return
}
}
201 changes: 201 additions & 0 deletions opamp/observiq/support.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package observiq

import (
"archive/tar"
"bytes"
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"

"github.com/shirou/gopsutil/v3/host"
"gopkg.in/yaml.v3"
)

const (
diagnosticsReportV1Capability = "com.bindplane.diagnostics.v1"
diagnosticsRequestType = "requestDiagnosticPackage"
)

type diagnosticRequestCustomMessage struct {
ReportURL string `yaml:"report_url"`
Headers map[string]string `yaml:"headers"`
}

type diagnosticInfo struct {
AgentID string
Version string
Goos string
GoArch string
HostInfo *host.InfoStat
}

func newDiagnosticInfo(agentID, version string) (diagnosticInfo, error) {
hi, err := host.Info()
if err != nil {
return diagnosticInfo{}, fmt.Errorf("stat hostinfo: %w", err)
}

return diagnosticInfo{
AgentID: agentID,
Version: version,
Goos: runtime.GOOS,
GoArch: runtime.GOARCH,
HostInfo: hi,
}, nil
}

func writeSupportPackage(writer io.Writer, di diagnosticInfo) error {
gzipWriter := gzip.NewWriter(writer)
defer gzipWriter.Close()

tw := tar.NewWriter(gzipWriter)
defer tw.Close()

diYaml, err := yaml.Marshal(di)
if err != nil {
return err
}

// Write basic agent info
if err := writeBytesToTar("agent-info.yaml", diYaml, tw); err != nil {
return fmt.Errorf("write info yaml: %w", err)
}

// Write log files
home := os.Getenv("OIQ_OTEL_COLLECTOR_HOME")
logsDir := filepath.Join(home, "log")

logsDirEntries, err := os.ReadDir(logsDir)
if err != nil {
return fmt.Errorf("read logs dir entries: %w", err)
}
for _, ent := range logsDirEntries {
if !ent.IsDir() {
path := filepath.Join(logsDir, ent.Name())
err := writeFileToTar(path, ent.Name(), tw)
if err != nil {
return fmt.Errorf("write log files: %w", err)
}
}
}

err = writeCPUProfileToTar("cpu.pprof", tw)
if err != nil {
return fmt.Errorf("write cpu profile: %w", err)
}

for _, p := range pprof.Profiles() {
err := writeProfileToTar(p.Name()+".pprof", p, tw)
if err != nil {
return fmt.Errorf("write profile %s: %w", p.Name(), err)
}
}

return nil
}

func writeBytesToTar(file string, by []byte, tw *tar.Writer) error {
err := tw.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,
Name: file,
Size: int64(len(by)),
Mode: 0666,
ModTime: time.Now(),
})
if err != nil {
return err
}

_, err = tw.Write(by)
if err != nil {
return err
}

return nil
}

func writeFileToTar(filePath, tarFile string, tw *tar.Writer) error {
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer f.Close()

fi, err := f.Stat()
if err != nil {
return fmt.Errorf("stat file: %w", err)
}

err = tw.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,
Name: tarFile,
Size: fi.Size(),
Mode: 0666,
ModTime: time.Now(),
})
if err != nil {
return fmt.Errorf("write tar header: %w", err)
}

if _, err = io.Copy(tw, f); err != nil {
return fmt.Errorf("copy file to tar: %w", err)
}

return nil
}

func writeCPUProfileToTar(file string, tw *tar.Writer) error {
by := &bytes.Buffer{}
pprof.StartCPUProfile(by)
time.Sleep(15 * time.Second)
pprof.StopCPUProfile()

err := tw.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,
Name: file,
Size: int64(by.Len()),
Mode: 0666,
ModTime: time.Now(),
})
if err != nil {
return err
}

_, err = tw.Write(by.Bytes())
if err != nil {
return err
}

return nil
}

func writeProfileToTar(file string, profile *pprof.Profile, tw *tar.Writer) error {
by := &bytes.Buffer{}
err := profile.WriteTo(by, 0)
if err != nil {
return fmt.Errorf("render profile: %w", err)
}

err = tw.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,
Name: file,
Size: int64(by.Len()),
Mode: 0666,
ModTime: time.Now(),
})
if err != nil {
return err
}

_, err = tw.Write(by.Bytes())
if err != nil {
return err
}

return nil
}