From 06a2ac16f5ceb63e4142ddf942157aa00164bdcc Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 17 Oct 2024 19:16:33 -0400 Subject: [PATCH 1/4] port v2 logic to v1 agent --- opamp/observiq/observiq_client.go | 56 ++++++++++++++++ opamp/observiq/support.go | 104 ++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 opamp/observiq/support.go diff --git a/opamp/observiq/observiq_client.go b/opamp/observiq/observiq_client.go index 9c250425d..bd3a38fb4 100644 --- a/opamp/observiq/observiq_client.go +++ b/opamp/observiq/observiq_client.go @@ -16,12 +16,14 @@ package observiq import ( + "bytes" "context" "encoding/hex" "errors" "fmt" "net/http" "net/url" + "os" "slices" "sync" @@ -35,6 +37,7 @@ import ( "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" "go.uber.org/zap" + "gopkg.in/yaml.v3" ) var ( @@ -149,6 +152,7 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) { err = observiqClient.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{ Capabilities: []string{ measurements.ReportMeasurementsV1Capability, + diagnosticsReportV1Capability, }, }) if err != nil { @@ -350,6 +354,15 @@ func (c *Client) onMessageFuncHandler(ctx context.Context, msg *types.MessageDat c.measurementsSender.Stop() } } + if msg.CustomMessage != nil { + msgCapability := msg.CustomMessage.GetCapability() + msgType := msg.CustomMessage.GetType() + + if msgCapability == diagnosticsReportV1Capability && + msgType == diagnosticsRequestType { + c.handleDiagnosticPackageRequest(msg.CustomMessage.GetData()) + } + } } func (c *Client) onRemoteConfigHandler(ctx context.Context, remoteConfig *protobufs.AgentRemoteConfig) error { @@ -726,3 +739,46 @@ func (c *Client) safeGetDisconnecting() bool { defer c.mutex.Unlock() return c.disconnecting } + +func (c *Client) handleDiagnosticPackageRequest(data []byte) { + var req diagnosticRequestCustomMessage + err := yaml.Unmarshal(data, &req) + if err != nil { + c.logger.Error("Failed to unmarshal diagnostic request.", zap.Error(err)) + return + } + + // TODO: Support streaming (don't need to read full log files into memory) + buf := &bytes.Buffer{} + // TODO: Pass actual ID/version + di := newDiagnosticInfo(c.ident.agentID, c.ident.version) + err = writeSupportPackage(buf, di) + if err != nil { + c.logger.Error("Failed to unmarshal diagnostic request.", zap.Error(err)) + return + } + + _ = os.WriteFile("./test-out.tar.gz", buf.Bytes(), 0600) + + httpReq, err := http.NewRequestWithContext(context.TODO(), "PUT", req.ReportURL, buf) + if err != nil { + c.logger.Error("Failed to create http request.", zap.Error(err)) + return + } + + for k, v := range req.Headers { + httpReq.Header.Add(k, v) + } + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + c.logger.Error("Failed to PUT http request.", zap.Error(err)) + return + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + c.logger.Error("Diagnostic PUT returned bad status code", zap.Int("status_code", resp.StatusCode)) + return + } +} diff --git a/opamp/observiq/support.go b/opamp/observiq/support.go new file mode 100644 index 000000000..399ec10c8 --- /dev/null +++ b/opamp/observiq/support.go @@ -0,0 +1,104 @@ +package observiq + +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "os" + "runtime" + + "gopkg.in/yaml.v3" +) + +const ( + diagnosticsReportV1Capability = "com.bindplane.diagnostics.v1" + diagnosticsRequestType = "requestDiagnosticPackage" +) + +type diagnosticRequestCustomMessage struct { + ReportURL string `yaml:"report_url"` + Headers map[string]string `yaml:"headers"` +} + +type diagnosticInfo struct { + AgentID string + Version string + Goos string + GoArch string +} + +func newDiagnosticInfo(agentID, version string) diagnosticInfo { + return diagnosticInfo{ + AgentID: agentID, + Version: version, + Goos: runtime.GOOS, + GoArch: runtime.GOARCH, + } +} + +func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { + gzipWriter := gzip.NewWriter(writer) + defer gzipWriter.Close() + + tw := tar.NewWriter(gzipWriter) + + diYaml, err := yaml.Marshal(di) + if err != nil { + return err + } + + if err := writeBytesToTar("diagnostic-info.yaml", diYaml, tw); err != nil { + return fmt.Errorf("write info yaml: %w", err) + } + + return tw.Close() +} + +func writeBytesToTar(file string, by []byte, tw *tar.Writer) error { + err := tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: file, + Size: int64(len(by)), + Mode: 0666, + }) + if err != nil { + return err + } + + _, err = tw.Write(by) + if err != nil { + return err + } + + return nil +} + +func writeFileToTar(filePath, tarFile string, tw *tar.Writer) error { + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + return fmt.Errorf("stat file: %w", err) + } + + err = tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: tarFile, + Size: fi.Size(), + Mode: 0666, + }) + if err != nil { + return fmt.Errorf("write tar header: %w", err) + } + + if _, err = io.Copy(tw, f); err != nil { + return fmt.Errorf("copy file to tar: %w", err) + } + + return nil +} From 98024fee768793f07303e5ff7bab27cd915c6822 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 17 Oct 2024 19:58:14 -0400 Subject: [PATCH 2/4] add logs and os info --- go.mod | 2 ++ go.sum | 2 ++ opamp/observiq/observiq_client.go | 13 +++++---- opamp/observiq/support.go | 48 ++++++++++++++++++++++++------- 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index e1c7ba572..104b42abb 100644 --- a/go.mod +++ b/go.mod @@ -199,6 +199,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0 + require ( cel.dev/expr v0.16.0 // indirect cloud.google.com/go/auth v0.9.8 // indirect diff --git a/go.sum b/go.sum index 6092a8bf9..69ff74774 100644 --- a/go.sum +++ b/go.sum @@ -2004,6 +2004,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2client github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.111.0/go.mod h1:66TKY0RWUw9GGZ8SQrr1CyXL17Z9eRvGfsY1WBkndMk= github.com/open-telemetry/opentelemetry-collector-contrib/extension/oidcauthextension v0.111.0 h1:XvmX2mHl4Y4qKhRGfGbhLhfLg/+gSpcXhO6WcgQe2JM= github.com/open-telemetry/opentelemetry-collector-contrib/extension/oidcauthextension v0.111.0/go.mod h1:I/Lk+XENTjQkO8ipfc12eA2LHxUc/zNJTJkvf64tnG4= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0 h1:cZVtc0BkL4iHOuUP28f+LuJiueUmd/MyYULt21YzQkY= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0/go.mod h1:wBrMy+mgk1llwg2zufyS6VNVZxavBQH97zJcxiDEWz8= github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.111.0 h1:m/u2iRttl/nEjp0EZ9w371LLAqogw0tDn+wPU6D7ArY= github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.111.0/go.mod h1:mmkCl306sRZYt/7uNmjvuRRvbe/xUDSDm8fEAGSSMKI= github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.111.0 h1:9k20EsVLgDoVxnJjp9Hg4Eosj5E+A77D5lr01fGQk0A= diff --git a/opamp/observiq/observiq_client.go b/opamp/observiq/observiq_client.go index bd3a38fb4..69482f2db 100644 --- a/opamp/observiq/observiq_client.go +++ b/opamp/observiq/observiq_client.go @@ -23,7 +23,6 @@ import ( "fmt" "net/http" "net/url" - "os" "slices" "sync" @@ -750,15 +749,17 @@ func (c *Client) handleDiagnosticPackageRequest(data []byte) { // TODO: Support streaming (don't need to read full log files into memory) buf := &bytes.Buffer{} - // TODO: Pass actual ID/version - di := newDiagnosticInfo(c.ident.agentID, c.ident.version) - err = writeSupportPackage(buf, di) + di, err := newDiagnosticInfo(c.ident.agentID, c.ident.version) if err != nil { - c.logger.Error("Failed to unmarshal diagnostic request.", zap.Error(err)) + c.logger.Error("Failed to create diagnostic info.", zap.Error(err)) return } - _ = os.WriteFile("./test-out.tar.gz", buf.Bytes(), 0600) + err = writeSupportPackage(buf, di) + if err != nil { + c.logger.Error("Failed to create unmarshal request.", zap.Error(err)) + return + } httpReq, err := http.NewRequestWithContext(context.TODO(), "PUT", req.ReportURL, buf) if err != nil { diff --git a/opamp/observiq/support.go b/opamp/observiq/support.go index 399ec10c8..d88cc7c04 100644 --- a/opamp/observiq/support.go +++ b/opamp/observiq/support.go @@ -6,8 +6,10 @@ import ( "fmt" "io" "os" + "path/filepath" "runtime" + "github.com/shirou/gopsutil/v3/host" "gopkg.in/yaml.v3" ) @@ -22,19 +24,26 @@ type diagnosticRequestCustomMessage struct { } type diagnosticInfo struct { - AgentID string - Version string - Goos string - GoArch string + AgentID string + Version string + Goos string + GoArch string + HostInfo *host.InfoStat } -func newDiagnosticInfo(agentID, version string) diagnosticInfo { - return diagnosticInfo{ - AgentID: agentID, - Version: version, - Goos: runtime.GOOS, - GoArch: runtime.GOARCH, +func newDiagnosticInfo(agentID, version string) (diagnosticInfo, error) { + hi, err := host.Info() + if err != nil { + return diagnosticInfo{}, fmt.Errorf("stat hostinfo: %w", err) } + + return diagnosticInfo{ + AgentID: agentID, + Version: version, + Goos: runtime.GOOS, + GoArch: runtime.GOARCH, + HostInfo: hi, + }, nil } func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { @@ -48,10 +57,29 @@ func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { return err } + // Write basic agent info if err := writeBytesToTar("diagnostic-info.yaml", diYaml, tw); err != nil { return fmt.Errorf("write info yaml: %w", err) } + // Write log files + home := os.Getenv("OIQ_OTEL_COLLECTOR_HOME") + logsDir := filepath.Join(home, "log") + + logsDirEntries, err := os.ReadDir(logsDir) + if err != nil { + return fmt.Errorf("read logs dir entries: %w", err) + } + for _, ent := range logsDirEntries { + if !ent.IsDir() { + path := filepath.Join(logsDir, ent.Name()) + err := writeFileToTar(path, ent.Name(), tw) + if err != nil { + return fmt.Errorf("write log files: %w", err) + } + } + } + return tw.Close() } From d0fc3b89402aaa66b82d11f3bd1e2013b28f3f4d Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 17 Oct 2024 20:14:31 -0400 Subject: [PATCH 3/4] add profiles --- opamp/observiq/observiq_client.go | 10 ++++- opamp/observiq/support.go | 71 ++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/opamp/observiq/observiq_client.go b/opamp/observiq/observiq_client.go index 69482f2db..570dbfcf5 100644 --- a/opamp/observiq/observiq_client.go +++ b/opamp/observiq/observiq_client.go @@ -78,6 +78,8 @@ type Client struct { collectorMntrWg sync.WaitGroup currentConfig opamp.Config + + supportPackageMux sync.Mutex } // NewClientArgs arguments passed when creating a new client @@ -126,6 +128,7 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) { packagesStateProvider: newPackagesStateProvider(clientLogger, packagestate.DefaultFileName), updaterManager: updaterManger, reportManager: reportManager, + supportPackageMux: sync.Mutex{}, } // Parse URL to determin scheme @@ -359,7 +362,7 @@ func (c *Client) onMessageFuncHandler(ctx context.Context, msg *types.MessageDat if msgCapability == diagnosticsReportV1Capability && msgType == diagnosticsRequestType { - c.handleDiagnosticPackageRequest(msg.CustomMessage.GetData()) + go c.handleDiagnosticPackageRequest(msg.CustomMessage.GetData()) } } } @@ -740,6 +743,10 @@ func (c *Client) safeGetDisconnecting() bool { } func (c *Client) handleDiagnosticPackageRequest(data []byte) { + // Only one diagnostic package should be generated at a time. + c.supportPackageMux.Lock() + defer c.supportPackageMux.Unlock() + var req diagnosticRequestCustomMessage err := yaml.Unmarshal(data, &req) if err != nil { @@ -748,6 +755,7 @@ func (c *Client) handleDiagnosticPackageRequest(data []byte) { } // TODO: Support streaming (don't need to read full log files into memory) + // We can do this with io.Pipe I think buf := &bytes.Buffer{} di, err := newDiagnosticInfo(c.ident.agentID, c.ident.version) if err != nil { diff --git a/opamp/observiq/support.go b/opamp/observiq/support.go index d88cc7c04..3c0dcd04d 100644 --- a/opamp/observiq/support.go +++ b/opamp/observiq/support.go @@ -2,12 +2,15 @@ package observiq import ( "archive/tar" + "bytes" "compress/gzip" "fmt" "io" "os" "path/filepath" "runtime" + "runtime/pprof" + "time" "github.com/shirou/gopsutil/v3/host" "gopkg.in/yaml.v3" @@ -51,6 +54,7 @@ func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { defer gzipWriter.Close() tw := tar.NewWriter(gzipWriter) + defer tw.Close() diYaml, err := yaml.Marshal(di) if err != nil { @@ -80,7 +84,19 @@ func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { } } - return tw.Close() + err = writeCPUProfileToTar("cpu.pprof", tw) + if err != nil { + return fmt.Errorf("write cpu profile: %w", err) + } + + for _, p := range pprof.Profiles() { + err := writeProfileToTar(p.Name()+".pprof", p, tw) + if err != nil { + return fmt.Errorf("write profile %s: %w", p.Name(), err) + } + } + + return nil } func writeBytesToTar(file string, by []byte, tw *tar.Writer) error { @@ -89,6 +105,7 @@ func writeBytesToTar(file string, by []byte, tw *tar.Writer) error { Name: file, Size: int64(len(by)), Mode: 0666, + ModTime: time.Now(), }) if err != nil { return err @@ -119,6 +136,7 @@ func writeFileToTar(filePath, tarFile string, tw *tar.Writer) error { Name: tarFile, Size: fi.Size(), Mode: 0666, + ModTime: time.Now(), }) if err != nil { return fmt.Errorf("write tar header: %w", err) @@ -130,3 +148,54 @@ func writeFileToTar(filePath, tarFile string, tw *tar.Writer) error { return nil } + +func writeCPUProfileToTar(file string, tw *tar.Writer) error { + by := &bytes.Buffer{} + pprof.StartCPUProfile(by) + time.Sleep(15 * time.Second) + pprof.StopCPUProfile() + + err := tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: file, + Size: int64(by.Len()), + Mode: 0666, + ModTime: time.Now(), + }) + if err != nil { + return err + } + + _, err = tw.Write(by.Bytes()) + if err != nil { + return err + } + + return nil +} + +func writeProfileToTar(file string, profile *pprof.Profile, tw *tar.Writer) error { + by := &bytes.Buffer{} + err := profile.WriteTo(by, 0) + if err != nil { + return fmt.Errorf("render profile: %w", err) + } + + err = tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: file, + Size: int64(by.Len()), + Mode: 0666, + ModTime: time.Now(), + }) + if err != nil { + return err + } + + _, err = tw.Write(by.Bytes()) + if err != nil { + return err + } + + return nil +} From b9ad8685c22d8457826b70f472bb5fdcf5bc64ef Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Thu, 17 Oct 2024 20:23:42 -0400 Subject: [PATCH 4/4] rename diagnostic info to agent-info.yaml --- opamp/observiq/support.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opamp/observiq/support.go b/opamp/observiq/support.go index 3c0dcd04d..16e52ce47 100644 --- a/opamp/observiq/support.go +++ b/opamp/observiq/support.go @@ -62,7 +62,7 @@ func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { } // Write basic agent info - if err := writeBytesToTar("diagnostic-info.yaml", diYaml, tw); err != nil { + if err := writeBytesToTar("agent-info.yaml", diYaml, tw); err != nil { return fmt.Errorf("write info yaml: %w", err) }