Skip to content

Commit 988d91f

Browse files
committed
Merge branch '212-io-bpf' into 'master'
feat: calculate reads taking into account ZFS ARC # Description - collect biosnoop-bpfcc (https://github.com/iovisor/bcc/tree/master/tools) output during SQL run - filter the output by process ID - calculate actual reads from block device: - considering parallel workers - excluding processes from neighboring containers https://gitlab.com/postgres-ai/database-lab/-/issues/212#note_501509635 # Related issue #212 See merge request postgres-ai/database-lab!273
2 parents 2a6902b + d940d97 commit 988d91f

File tree

9 files changed

+509
-91
lines changed

9 files changed

+509
-91
lines changed

Dockerfile.dblab-server

+7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ FROM docker:19.03.14
3333

3434
# Install dependencies.
3535
RUN apk update && apk add --no-cache zfs lvm2 bash util-linux
36+
RUN echo 'http://dl-cdn.alpinelinux.org/alpine/v3.13/main' >> /etc/apk/repositories \
37+
&& echo 'http://dl-cdn.alpinelinux.org/alpine/v3.13/community' >> /etc/apk/repositories \
38+
&& apk add bcc-tools=0.18.0-r0 bcc-doc=0.18.0-r0 && ln -s $(which python3) /usr/bin/python \
39+
# TODO: remove after release the PR: https://github.com/iovisor/bcc/pull/3286 (issue: https://github.com/iovisor/bcc/issues/3099)
40+
&& wget https://raw.githubusercontent.com/iovisor/bcc/master/tools/biosnoop.py -O /usr/share/bcc/tools/biosnoop
41+
42+
ENV PATH="${PATH}:/usr/share/bcc/tools"
3643

3744
WORKDIR /home/dblab
3845

pkg/estimator/monitor.go

+247
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
2021 © Postgres.ai
3+
*/
4+
5+
package estimator
6+
7+
import (
8+
"bufio"
9+
"bytes"
10+
"context"
11+
"fmt"
12+
"io"
13+
"os/exec"
14+
"regexp"
15+
"strconv"
16+
"sync/atomic"
17+
18+
"github.com/pkg/errors"
19+
20+
"gitlab.com/postgres-ai/database-lab/v2/pkg/log"
21+
)
22+
23+
const (
24+
regExp = "^[.0-9]+\\s+\\S+\\s+(\\d+)\\s+\\w+\\s+(W|R)\\s+\\d+\\s+(\\d+)\\s+[.0-9]+$"
25+
countMatches = 4
26+
expectedMappingParts = 2
27+
procDir = "host_proc"
28+
parallelWorkerCmdline = "parallel worker for PID "
29+
)
30+
31+
var (
32+
r = regexp.MustCompile(regExp)
33+
nsPrefix = []byte("NSpid:")
34+
)
35+
36+
// Monitor observes processes and system activity.
37+
type Monitor struct {
38+
pid int
39+
container string
40+
pidMapping map[int]int
41+
profiler *Profiler
42+
}
43+
44+
// NewMonitor creates a new monitor.
45+
func NewMonitor(pid int, container string, profiler *Profiler) *Monitor {
46+
return &Monitor{
47+
pid: pid,
48+
container: container,
49+
profiler: profiler,
50+
pidMapping: make(map[int]int),
51+
}
52+
}
53+
54+
// InspectIOBlocks counts physically read blocks.
55+
func (m *Monitor) InspectIOBlocks(ctx context.Context) error {
56+
log.Dbg("Start IO inspection")
57+
58+
cmd := exec.Command("biosnoop")
59+
60+
r, err := cmd.StdoutPipe()
61+
if err != nil {
62+
return err
63+
}
64+
65+
cmd.Stderr = cmd.Stdout
66+
67+
go m.scanOutput(ctx, r)
68+
69+
if err := cmd.Start(); err != nil {
70+
return errors.Wrap(err, "failed to run")
71+
}
72+
73+
<-m.profiler.exitChan
74+
75+
log.Dbg("Finish IO inspection")
76+
77+
return nil
78+
}
79+
80+
type bytesEntry struct {
81+
pid int
82+
totalBytes uint64
83+
}
84+
85+
func (m *Monitor) scanOutput(ctx context.Context, r io.Reader) {
86+
scanner := bufio.NewScanner(r)
87+
88+
for scanner.Scan() {
89+
scanBytes := scanner.Bytes()
90+
91+
if !bytes.Contains(scanBytes, []byte("postgres")) && !bytes.Contains(scanBytes, []byte("psql")) {
92+
continue
93+
}
94+
95+
bytesEntry := m.parseReadBytes(scanBytes)
96+
if bytesEntry == nil || bytesEntry.totalBytes == 0 {
97+
continue
98+
}
99+
100+
pid, ok := m.pidMapping[bytesEntry.pid]
101+
if !ok {
102+
hostPID, err := m.detectReferencedPID(bytesEntry.pid)
103+
m.pidMapping[bytesEntry.pid] = hostPID
104+
105+
if err != nil {
106+
continue
107+
}
108+
109+
pid = hostPID
110+
}
111+
112+
if pid != m.pid {
113+
continue
114+
}
115+
116+
atomic.AddUint64(&m.profiler.readBytes, bytesEntry.totalBytes)
117+
118+
select {
119+
case <-ctx.Done():
120+
log.Dbg(ctx.Err().Error())
121+
return
122+
123+
case <-m.profiler.exitChan:
124+
log.Dbg("finish to scan IO entries")
125+
return
126+
127+
default:
128+
}
129+
}
130+
}
131+
132+
func (m *Monitor) detectReferencedPID(pid int) (int, error) {
133+
hash, err := getContainerHash(pid)
134+
if err != nil {
135+
return 0, err
136+
}
137+
138+
if hash == "" || !m.isAppropriateContainer(hash) {
139+
return 0, nil
140+
}
141+
142+
procParallel, err := exec.Command("cat", fmt.Sprintf("/%s/%d/cmdline", procDir, pid)).Output()
143+
if err != nil {
144+
return 0, err
145+
}
146+
147+
if bytes.Contains(procParallel, []byte("postgres")) &&
148+
bytes.Contains(procParallel, []byte(parallelWorkerCmdline+strconv.Itoa(m.pid))) {
149+
return m.pid, nil
150+
}
151+
152+
procStatus, err := exec.Command("cat", fmt.Sprintf("/%s/%d/status", procDir, pid)).Output()
153+
if err != nil {
154+
return 0, err
155+
}
156+
157+
return m.parsePIDMapping(procStatus)
158+
}
159+
160+
func (m *Monitor) isAppropriateContainer(hash string) bool {
161+
return m.container == hash
162+
}
163+
164+
func (m *Monitor) parsePIDMapping(procStatus []byte) (int, error) {
165+
sc := bufio.NewScanner(bytes.NewBuffer(procStatus))
166+
167+
for sc.Scan() {
168+
line := sc.Bytes()
169+
if !bytes.HasPrefix(line, nsPrefix) {
170+
continue
171+
}
172+
173+
nsPID := bytes.TrimSpace(bytes.TrimPrefix(line, nsPrefix))
174+
175+
pidValues := bytes.Fields(nsPID)
176+
if len(pidValues) < expectedMappingParts {
177+
return 0, nil
178+
}
179+
180+
hostPID, err := strconv.Atoi(string(bytes.TrimSpace(pidValues[1])))
181+
if err != nil {
182+
return 0, err
183+
}
184+
185+
return hostPID, nil
186+
}
187+
188+
return 0, nil
189+
}
190+
191+
func (m *Monitor) parseReadBytes(line []byte) *bytesEntry {
192+
submatch := r.FindSubmatch(line)
193+
if len(submatch) != countMatches {
194+
return nil
195+
}
196+
197+
totalBytes, err := strconv.ParseUint(string(submatch[3]), 10, 64)
198+
if err != nil {
199+
return nil
200+
}
201+
202+
pid, err := strconv.Atoi(string(submatch[1]))
203+
if err != nil {
204+
return nil
205+
}
206+
207+
return &bytesEntry{
208+
pid: pid,
209+
totalBytes: totalBytes,
210+
}
211+
}
212+
213+
func getContainerHash(pid int) (string, error) {
214+
procParallel, err := exec.Command("cat", fmt.Sprintf("/%s/%d/cgroup", procDir, pid)).Output()
215+
if err != nil {
216+
return "", err
217+
}
218+
219+
return detectContainerHash(procParallel), nil
220+
}
221+
222+
const (
223+
procNamePrefix = "1:name"
224+
procDockerEntry = "/docker/"
225+
)
226+
227+
func detectContainerHash(procParallel []byte) string {
228+
sc := bufio.NewScanner(bytes.NewBuffer(procParallel))
229+
230+
for sc.Scan() {
231+
line := sc.Bytes()
232+
233+
if !bytes.HasPrefix(line, []byte(procNamePrefix)) {
234+
continue
235+
}
236+
237+
procNameLine := bytes.SplitN(line, []byte(procDockerEntry), 2)
238+
239+
if len(procNameLine) == 1 {
240+
return ""
241+
}
242+
243+
return string(procNameLine[1])
244+
}
245+
246+
return ""
247+
}

0 commit comments

Comments
 (0)