diff --git a/profiler/options.go b/profiler/options.go index 994ab21ecd..df1b1ab619 100644 --- a/profiler/options.go +++ b/profiler/options.go @@ -112,6 +112,7 @@ type config struct { traceConfig executionTraceConfig endpointCountEnabled bool enabled bool + flushOnExit bool } // logStartup records the configuration to the configured logger in JSON format @@ -148,6 +149,7 @@ func logStartup(c *config) { "endpoint_count_enabled": c.endpointCountEnabled, "custom_profiler_label_keys": c.customProfilerLabels, "enabled": c.enabled, + "flush_on_exit": c.flushOnExit, } b, err := json.Marshal(info) if err != nil { @@ -242,6 +244,7 @@ func defaultConfig() (*config, error) { if v := os.Getenv("DD_VERSION"); v != "" { WithVersion(v)(&c) } + c.flushOnExit = internal.BoolEnv("DD_PROFILING_FLUSH_ON_EXIT", false) tags := make(map[string]string) if v := os.Getenv("DD_TAGS"); v != "" { diff --git a/profiler/profiler.go b/profiler/profiler.go index 9c7ae8cf17..e694527aae 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -295,7 +295,8 @@ func (p *profiler) collect(ticker <-chan time.Time) { endpointCounter.GetAndReset() }() - for { + exit := false + for !exit { bat := batch{ seq: p.seq, host: p.cfg.hostname, @@ -384,7 +385,11 @@ func (p *profiler) collect(ticker <-chan time.Time) { // is less than the configured profiling period, the ticker will block // until the end of the profiling period. case <-p.exit: - return + if !p.cfg.flushOnExit { + return + } + // If we're flushing, we enqueue the batch before exiting the loop. + exit = true } // Include endpoint hits from tracer in profile `event.json`. @@ -457,8 +462,13 @@ func (p *profiler) send() { for { select { case <-p.exit: - return - case bat := <-p.out: + if !p.cfg.flushOnExit { + return + } + case bat, ok := <-p.out: + if !ok { + return + } if err := p.outputDir(bat); err != nil { log.Error("Failed to output profile to dir: %v", err) } diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index d8b3b1e182..c073833ff3 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -22,6 +22,7 @@ import ( "runtime/trace" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -233,6 +234,59 @@ func TestStopLatency(t *testing.T) { } } +func TestFlushAndStop(t *testing.T) { + t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1") + received := startTestProfiler(t, 1, + WithProfileTypes(CPUProfile, HeapProfile), + WithPeriod(time.Hour), + WithUploadTimeout(time.Hour)) + + Stop() + + select { + case prof := <-received: + if len(prof.attachments["cpu.pprof"]) == 0 { + t.Errorf("expected CPU profile, got none") + } + if len(prof.attachments["delta-heap.pprof"]) == 0 { + t.Errorf("expected heap profile, got none") + } + case <-time.After(5 * time.Second): + t.Fatalf("profiler did not flush") + } +} + +func TestFlushAndStopTimeout(t *testing.T) { + uploadTimeout := 1 * time.Second + var requests atomic.Int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if h := r.Header.Get("DD-Telemetry-Request-Type"); len(h) > 0 { + return + } + requests.Add(1) + time.Sleep(2 * uploadTimeout) + })) + defer server.Close() + + t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1") + Start( + WithAgentAddr(server.Listener.Addr().String()), + WithPeriod(time.Hour), + WithUploadTimeout(uploadTimeout), + ) + + start := time.Now() + Stop() + + elapsed := time.Since(start) + if elapsed > (maxRetries*uploadTimeout)+1*time.Second { + t.Errorf("profiler took %v to stop", elapsed) + } + if requests.Load() != maxRetries { + t.Errorf("expected %d requests, got %d", maxRetries, requests.Load()) + } +} + func TestSetProfileFraction(t *testing.T) { t.Run("on", func(t *testing.T) { start := runtime.SetMutexProfileFraction(0) diff --git a/profiler/telemetry.go b/profiler/telemetry.go index 0dc2cf9be8..a53367f33a 100644 --- a/profiler/telemetry.go +++ b/profiler/telemetry.go @@ -55,6 +55,7 @@ func startTelemetry(c *config) { {Name: "endpoint_count_enabled", Value: c.endpointCountEnabled}, {Name: "num_custom_profiler_label_keys", Value: len(c.customProfilerLabels)}, {Name: "enabled", Value: c.enabled}, + {Name: "flush_on_exit", Value: c.flushOnExit}, }, ) } diff --git a/profiler/upload.go b/profiler/upload.go index a8b98f6560..6d736fc1be 100644 --- a/profiler/upload.go +++ b/profiler/upload.go @@ -37,7 +37,9 @@ func (p *profiler) upload(bat batch) error { for i := 0; i < maxRetries; i++ { select { case <-p.exit: - return nil + if !p.cfg.flushOnExit { + return nil + } default: } @@ -98,6 +100,9 @@ func (p *profiler) doRequest(bat batch) error { go func() { select { case <-p.exit: + if p.cfg.flushOnExit { + return + } case <-funcExit: } cancel()