Skip to content
This repository was archived by the owner on Mar 19, 2025. It is now read-only.

Commit 9abb5f3

Browse files
authored
profiler: collect profiles concurrently (DataDog#1282)
* profiler: collect profiles concurrently The upcoming C allocation profile will work by starting a profile, waiting for the profile period to elapse, then stopping and collecting the output. This won't work if the profiles are collected synchronously since the CPU profile is also collected this way, meaning there won't be enough time in a single profile period to collect both. So the profiles need to be collected concurrently. With concurrent profile collection, all profiles should block so that they're collected at the end of the profile period. This preserves the property that all profiles in a batch cover the same time, so events in one profile correspond to events in another. Along the way, a few tests needed to be fixed to not take 1 minute (the default profiling period) to complete. There was also a potential panic in TestAllUploaded if the sync.WaitGroup count goes negative due to multiple uploads going through before profiling is stopped. Also fixed unbounded growth of the collected profiles slice. Whoops! Made TestAllUploaded check the *second* batch of profiles to catch this bug.
1 parent 46b9da9 commit 9abb5f3

File tree

4 files changed

+63
-17
lines changed

4 files changed

+63
-17
lines changed

profiler/profile.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile
160160
return func(p *profiler) ([]byte, error) {
161161
var extra []*pprofile.Profile
162162
// TODO: add type safety for name == "heap" check and remove redunancy with profileType.Name.
163-
if cAlloc, ok := extensions.GetCAllocationProfiler(); ok && p.cfg.deltaProfiles && name == "heap" {
163+
cAlloc, ok := extensions.GetCAllocationProfiler()
164+
switch {
165+
case ok && p.cfg.deltaProfiles && name == "heap":
164166
// For the heap profile, we'd also like to include C
165167
// allocations if that extension is enabled and have the
166168
// allocations show up in the same profile. Collect them
@@ -174,6 +176,11 @@ func collectGenericProfile(name string, delta *pprofutils.Delta) func(p *profile
174176
if err == nil {
175177
extra = append(extra, profile)
176178
}
179+
default:
180+
// In all cases, sleep until the end of the profile
181+
// period so that all profiles cover the same period of
182+
// time
183+
p.interruptibleSleep(p.cfg.period)
177184
}
178185

179186
var buf bytes.Buffer
@@ -272,7 +279,10 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
272279
return nil, fmt.Errorf("delta prof parse: %v", err)
273280
}
274281
var deltaData []byte
275-
if prevProf := p.prev[name]; prevProf == nil {
282+
p.mu.Lock()
283+
prevProf := p.prev[name]
284+
p.mu.Unlock()
285+
if prevProf == nil {
276286
// First time deltaProfile gets called for a type, there is no prevProf. In
277287
// this case we emit the current profile as a delta profile.
278288
deltaData = curData
@@ -298,7 +308,9 @@ func (p *profiler) deltaProfile(name string, delta *pprofutils.Delta, curData []
298308
}
299309
// Keep the most recent profiles in memory for future diffing. This needs to
300310
// be taken into account when enforcing memory limits going forward.
311+
p.mu.Lock()
301312
p.prev[name] = curProf
313+
p.mu.Unlock()
302314
return &profile{data: deltaData}, nil
303315
}
304316

profiler/profile_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ main;bar 0 0 8 16
117117
// followed by prof2 when calling runProfile().
118118
deltaProfiler := func(prof1, prof2 []byte, opts ...Option) (*profiler, func()) {
119119
returnProfs := [][]byte{prof1, prof2}
120+
opts = append(opts, WithPeriod(5*time.Millisecond))
120121
p, err := unstartedProfiler(opts...)
121122
p.testHooks.lookupProfile = func(_ string, w io.Writer, _ int) error {
122123
_, err := w.Write(returnProfs[0])
@@ -189,7 +190,7 @@ main;bar 0 0 8 16
189190
})
190191

191192
t.Run("goroutine", func(t *testing.T) {
192-
p, err := unstartedProfiler()
193+
p, err := unstartedProfiler(WithPeriod(time.Millisecond))
193194
p.testHooks.lookupProfile = func(name string, w io.Writer, _ int) error {
194195
_, err := w.Write([]byte(name))
195196
return err

profiler/profiler.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func Stop() {
6363
// profiler collects and sends preset profiles to the Datadog API at a given frequency
6464
// using a given configuration.
6565
type profiler struct {
66+
mu sync.Mutex
6667
cfg *config // profile configuration
6768
out chan batch // upload queue
6869
uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests
@@ -211,6 +212,12 @@ func (p *profiler) run() {
211212
// an item.
212213
func (p *profiler) collect(ticker <-chan time.Time) {
213214
defer close(p.out)
215+
var (
216+
// mu guards completed
217+
mu sync.Mutex
218+
completed []*profile
219+
wg sync.WaitGroup
220+
)
214221
for {
215222
select {
216223
case <-ticker:
@@ -225,16 +232,24 @@ func (p *profiler) collect(ticker <-chan time.Time) {
225232
end: now.Add(p.cfg.cpuDuration),
226233
}
227234

235+
completed = completed[:0]
228236
for _, t := range p.enabledProfileTypes() {
229-
profs, err := p.runProfile(t)
230-
if err != nil {
231-
log.Error("Error getting %s profile: %v; skipping.", t, err)
232-
p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
233-
continue
234-
}
235-
for _, prof := range profs {
236-
bat.addProfile(prof)
237-
}
237+
wg.Add(1)
238+
go func(t ProfileType) {
239+
defer wg.Done()
240+
profs, err := p.runProfile(t)
241+
if err != nil {
242+
log.Error("Error getting %s profile: %v; skipping.", t, err)
243+
p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1)
244+
}
245+
mu.Lock()
246+
defer mu.Unlock()
247+
completed = append(completed, profs...)
248+
}(t)
249+
}
250+
wg.Wait()
251+
for _, prof := range completed {
252+
bat.addProfile(prof)
238253
}
239254
p.enqueueUpload(bat)
240255
case <-p.exit:

profiler/profiler_test.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,10 @@ func TestStopLatency(t *testing.T) {
213213
stopped <- struct{}{}
214214
}()
215215

216-
timeout := 20 * time.Millisecond
216+
// CPU profiling polls in 100 millisecond intervals and this can't be
217+
// interrupted by pprof.StopCPUProfile, so we can't guarantee profiling
218+
// will stop faster than that.
219+
timeout := 200 * time.Millisecond
217220
select {
218221
case <-stopped:
219222
case <-time.After(timeout):
@@ -227,6 +230,7 @@ func TestStopLatency(t *testing.T) {
227230
func TestProfilerInternal(t *testing.T) {
228231
t.Run("collect", func(t *testing.T) {
229232
p, err := unstartedProfiler(
233+
WithPeriod(1*time.Millisecond),
230234
CPUDuration(1*time.Millisecond),
231235
WithProfileTypes(HeapProfile, CPUProfile),
232236
)
@@ -344,17 +348,31 @@ func TestAllUploaded(t *testing.T) {
344348
//
345349
// TODO: Further check that the uploaded profiles are all valid
346350
var (
347-
wg sync.WaitGroup
348351
profiles []string
349352
)
353+
// received indicates that the server has received a profile upload.
354+
// This is used similarly to a sync.WaitGroup but avoids a potential
355+
// panic if too many requests are received before profiling is stopped
356+
// and the WaitGroup count goes negative.
357+
//
358+
// The channel is buffered with 2 entries so we can check that the
359+
// second batch of profiles is correct in case the profiler gets in a
360+
// bad state after the first round of profiling.
361+
received := make(chan struct{}, 2)
350362
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
351-
defer wg.Done()
363+
defer func() {
364+
select {
365+
case received <- struct{}{}:
366+
default:
367+
}
368+
}()
352369
_, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
353370
if err != nil {
354371
t.Fatalf("bad media type: %s", err)
355372
return
356373
}
357374
mr := multipart.NewReader(r.Body, params["boundary"])
375+
profiles = profiles[:0]
358376
for {
359377
p, err := mr.NextPart()
360378
if err == io.EOF {
@@ -369,7 +387,6 @@ func TestAllUploaded(t *testing.T) {
369387
}
370388
}))
371389
defer server.Close()
372-
wg.Add(1)
373390

374391
// re-implemented testing.T.Setenv since that function requires Go 1.17
375392
old, ok := os.LookupEnv("DD_PROFILING_WAIT_PROFILE")
@@ -392,7 +409,8 @@ func TestAllUploaded(t *testing.T) {
392409
CPUDuration(1*time.Millisecond),
393410
)
394411
defer Stop()
395-
wg.Wait()
412+
<-received
413+
<-received
396414

397415
expected := []string{
398416
"data[cpu.pprof]",

0 commit comments

Comments
 (0)