diff --git a/go.mod b/go.mod index e1c7ba572..104b42abb 100644 --- a/go.mod +++ b/go.mod @@ -199,6 +199,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0 + require ( cel.dev/expr v0.16.0 // indirect cloud.google.com/go/auth v0.9.8 // indirect diff --git a/go.sum b/go.sum index 6092a8bf9..69ff74774 100644 --- a/go.sum +++ b/go.sum @@ -2004,6 +2004,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2client github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.111.0/go.mod h1:66TKY0RWUw9GGZ8SQrr1CyXL17Z9eRvGfsY1WBkndMk= github.com/open-telemetry/opentelemetry-collector-contrib/extension/oidcauthextension v0.111.0 h1:XvmX2mHl4Y4qKhRGfGbhLhfLg/+gSpcXhO6WcgQe2JM= github.com/open-telemetry/opentelemetry-collector-contrib/extension/oidcauthextension v0.111.0/go.mod h1:I/Lk+XENTjQkO8ipfc12eA2LHxUc/zNJTJkvf64tnG4= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0 h1:cZVtc0BkL4iHOuUP28f+LuJiueUmd/MyYULt21YzQkY= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0/go.mod h1:wBrMy+mgk1llwg2zufyS6VNVZxavBQH97zJcxiDEWz8= github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.111.0 h1:m/u2iRttl/nEjp0EZ9w371LLAqogw0tDn+wPU6D7ArY= github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.111.0/go.mod h1:mmkCl306sRZYt/7uNmjvuRRvbe/xUDSDm8fEAGSSMKI= github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.111.0 h1:9k20EsVLgDoVxnJjp9Hg4Eosj5E+A77D5lr01fGQk0A= diff --git a/opamp/observiq/observiq_client.go b/opamp/observiq/observiq_client.go index 9c250425d..570dbfcf5 100644 --- a/opamp/observiq/observiq_client.go +++ b/opamp/observiq/observiq_client.go @@ -16,6 +16,7 @@ package observiq import ( + "bytes" "context" "encoding/hex" "errors" @@ -35,6 +36,7 @@ import ( "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" "go.uber.org/zap" + "gopkg.in/yaml.v3" ) var ( @@ -76,6 +78,8 @@ type Client struct { collectorMntrWg sync.WaitGroup currentConfig opamp.Config + + supportPackageMux sync.Mutex } // NewClientArgs arguments passed when creating a new client @@ -124,6 +128,7 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) { packagesStateProvider: newPackagesStateProvider(clientLogger, packagestate.DefaultFileName), updaterManager: updaterManger, reportManager: reportManager, + supportPackageMux: sync.Mutex{}, } // Parse URL to determin scheme @@ -149,6 +154,7 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) { err = observiqClient.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{ Capabilities: []string{ measurements.ReportMeasurementsV1Capability, + diagnosticsReportV1Capability, }, }) if err != nil { @@ -350,6 +356,15 @@ func (c *Client) onMessageFuncHandler(ctx context.Context, msg *types.MessageDat c.measurementsSender.Stop() } } + if msg.CustomMessage != nil { + msgCapability := msg.CustomMessage.GetCapability() + msgType := msg.CustomMessage.GetType() + + if msgCapability == diagnosticsReportV1Capability && + msgType == diagnosticsRequestType { + go c.handleDiagnosticPackageRequest(msg.CustomMessage.GetData()) + } + } } func (c *Client) onRemoteConfigHandler(ctx context.Context, remoteConfig *protobufs.AgentRemoteConfig) error { @@ -726,3 +741,53 @@ func (c *Client) safeGetDisconnecting() bool { defer c.mutex.Unlock() return c.disconnecting } + +func (c *Client) handleDiagnosticPackageRequest(data []byte) { + // Only one diagnostic package should be generated at a time. + c.supportPackageMux.Lock() + defer c.supportPackageMux.Unlock() + + var req diagnosticRequestCustomMessage + err := yaml.Unmarshal(data, &req) + if err != nil { + c.logger.Error("Failed to unmarshal diagnostic request.", zap.Error(err)) + return + } + + // TODO: Support streaming (don't need to read full log files into memory) + // We can do this with io.Pipe I think + buf := &bytes.Buffer{} + di, err := newDiagnosticInfo(c.ident.agentID, c.ident.version) + if err != nil { + c.logger.Error("Failed to create diagnostic info.", zap.Error(err)) + return + } + + err = writeSupportPackage(buf, di) + if err != nil { + c.logger.Error("Failed to create unmarshal request.", zap.Error(err)) + return + } + + httpReq, err := http.NewRequestWithContext(context.TODO(), "PUT", req.ReportURL, buf) + if err != nil { + c.logger.Error("Failed to create http request.", zap.Error(err)) + return + } + + for k, v := range req.Headers { + httpReq.Header.Add(k, v) + } + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + c.logger.Error("Failed to PUT http request.", zap.Error(err)) + return + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + c.logger.Error("Diagnostic PUT returned bad status code", zap.Int("status_code", resp.StatusCode)) + return + } +} diff --git a/opamp/observiq/support.go b/opamp/observiq/support.go new file mode 100644 index 000000000..16e52ce47 --- /dev/null +++ b/opamp/observiq/support.go @@ -0,0 +1,201 @@ +package observiq + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "time" + + "github.com/shirou/gopsutil/v3/host" + "gopkg.in/yaml.v3" +) + +const ( + diagnosticsReportV1Capability = "com.bindplane.diagnostics.v1" + diagnosticsRequestType = "requestDiagnosticPackage" +) + +type diagnosticRequestCustomMessage struct { + ReportURL string `yaml:"report_url"` + Headers map[string]string `yaml:"headers"` +} + +type diagnosticInfo struct { + AgentID string + Version string + Goos string + GoArch string + HostInfo *host.InfoStat +} + +func newDiagnosticInfo(agentID, version string) (diagnosticInfo, error) { + hi, err := host.Info() + if err != nil { + return diagnosticInfo{}, fmt.Errorf("stat hostinfo: %w", err) + } + + return diagnosticInfo{ + AgentID: agentID, + Version: version, + Goos: runtime.GOOS, + GoArch: runtime.GOARCH, + HostInfo: hi, + }, nil +} + +func writeSupportPackage(writer io.Writer, di diagnosticInfo) error { + gzipWriter := gzip.NewWriter(writer) + defer gzipWriter.Close() + + tw := tar.NewWriter(gzipWriter) + defer tw.Close() + + diYaml, err := yaml.Marshal(di) + if err != nil { + return err + } + + // Write basic agent info + if err := writeBytesToTar("agent-info.yaml", diYaml, tw); err != nil { + return fmt.Errorf("write info yaml: %w", err) + } + + // Write log files + home := os.Getenv("OIQ_OTEL_COLLECTOR_HOME") + logsDir := filepath.Join(home, "log") + + logsDirEntries, err := os.ReadDir(logsDir) + if err != nil { + return fmt.Errorf("read logs dir entries: %w", err) + } + for _, ent := range logsDirEntries { + if !ent.IsDir() { + path := filepath.Join(logsDir, ent.Name()) + err := writeFileToTar(path, ent.Name(), tw) + if err != nil { + return fmt.Errorf("write log files: %w", err) + } + } + } + + err = writeCPUProfileToTar("cpu.pprof", tw) + if err != nil { + return fmt.Errorf("write cpu profile: %w", err) + } + + for _, p := range pprof.Profiles() { + err := writeProfileToTar(p.Name()+".pprof", p, tw) + if err != nil { + return fmt.Errorf("write profile %s: %w", p.Name(), err) + } + } + + return nil +} + +func writeBytesToTar(file string, by []byte, tw *tar.Writer) error { + err := tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: file, + Size: int64(len(by)), + Mode: 0666, + ModTime: time.Now(), + }) + if err != nil { + return err + } + + _, err = tw.Write(by) + if err != nil { + return err + } + + return nil +} + +func writeFileToTar(filePath, tarFile string, tw *tar.Writer) error { + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + return fmt.Errorf("stat file: %w", err) + } + + err = tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: tarFile, + Size: fi.Size(), + Mode: 0666, + ModTime: time.Now(), + }) + if err != nil { + return fmt.Errorf("write tar header: %w", err) + } + + if _, err = io.Copy(tw, f); err != nil { + return fmt.Errorf("copy file to tar: %w", err) + } + + return nil +} + +func writeCPUProfileToTar(file string, tw *tar.Writer) error { + by := &bytes.Buffer{} + pprof.StartCPUProfile(by) + time.Sleep(15 * time.Second) + pprof.StopCPUProfile() + + err := tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: file, + Size: int64(by.Len()), + Mode: 0666, + ModTime: time.Now(), + }) + if err != nil { + return err + } + + _, err = tw.Write(by.Bytes()) + if err != nil { + return err + } + + return nil +} + +func writeProfileToTar(file string, profile *pprof.Profile, tw *tar.Writer) error { + by := &bytes.Buffer{} + err := profile.WriteTo(by, 0) + if err != nil { + return fmt.Errorf("render profile: %w", err) + } + + err = tw.WriteHeader(&tar.Header{ + Typeflag: tar.TypeReg, + Name: file, + Size: int64(by.Len()), + Mode: 0666, + ModTime: time.Now(), + }) + if err != nil { + return err + } + + _, err = tw.Write(by.Bytes()) + if err != nil { + return err + } + + return nil +}