-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollect.go
127 lines (113 loc) · 2.82 KB
/
collect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package cpuprofile
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/google/pprof/profile"
)
// DefProfileDuration exports for testing.
var DefProfileDuration = time.Second
// Collector is a cpu profile collector, it collect cpu profile data from globalCPUProfiler.
type Collector struct {
ctx context.Context
writer io.Writer
err error // fields uses to store the result data of collected.
cancel context.CancelFunc
firstRead chan struct{}
dataCh ProfileConsumer
result *profile.Profile // fields uses to store the result data of collected.
wg sync.WaitGroup
started bool
}
func NewCollector() *Collector {
ctx, cancel := context.WithCancel(context.Background())
return &Collector{
ctx: ctx,
cancel: cancel,
firstRead: make(chan struct{}),
dataCh: make(ProfileConsumer, 1),
}
}
// StartCPUProfile is a substitute for the `pprof.StartCPUProfile` function.
// You should use this function instead of `pprof.StartCPUProfile`.
// Otherwise you may fail, or affect the TopSQL feature and pprof profile HTTP API .
// WARN: this function is not thread-safe.
func (pc *Collector) StartCPUProfile(w io.Writer) error {
if pc.started {
return errors.New("Collector already started")
}
pc.started = true
pc.writer = w
pc.wg.Add(1)
go pc.readProfileData()
return nil
}
// StopCPUProfile is a substitute for the `pprof.StopCPUProfile` function.
// WARN: this function is not thread-safe.
func (pc *Collector) StopCPUProfile() error {
if !pc.started {
return nil
}
// wait for reading least 1 profile data.
select {
case <-pc.firstRead:
case <-time.After(DefProfileDuration * 2):
}
pc.cancel()
pc.wg.Wait()
data, err := pc.buildProfileData()
if err != nil || data == nil {
return err
}
return data.Write(pc.writer)
}
// WaitProfilingFinish waits for collecting `seconds` profile data finished.
func (pc *Collector) readProfileData() {
// register cpu profile consumer.
globalCPUProfiler.register(pc.dataCh)
defer func() {
globalCPUProfiler.unregister(pc.dataCh)
close(pc.dataCh)
pc.wg.Done()
}()
pc.result, pc.err = nil, nil
firstRead := true
for {
select {
case <-pc.ctx.Done():
return
case data := <-pc.dataCh:
pc.err = pc.handleProfileData(data)
if pc.err != nil {
return
}
if firstRead {
firstRead = false
close(pc.firstRead)
}
}
}
}
func (pc *Collector) handleProfileData(data *ProfileData) error {
if data.Error != nil {
return data.Error
}
pd, err := profile.ParseData(data.Data.Bytes())
if err != nil {
return err
}
if pc.result == nil {
pc.result = pd
return nil
}
pc.result, err = profile.Merge([]*profile.Profile{pc.result, pd})
return err
}
func (pc *Collector) buildProfileData() (*profile.Profile, error) {
if pc.err != nil || pc.result == nil {
return nil, pc.err
}
return pc.result, nil
}