Skip to content

Commit be819d1

Browse files
mknyszekgopherbot
authored andcommitted
trace: add experimental flight recorder API
This change adds an experimental flight recorder API that lives outside the Go runtime. It implements the same API as the proposed API, but has a few caveats because it exists outside the runtime. Firstly, because the conceptual circular buffer lives outside the runtime, this flight recorder has slightly more overhead than a runtime-internal implementation would. Specifically, all the trace data needs to be copied out of the runtime and gently processed, and this process needs to happen continuously. Peak memory use is also going to be higher because of this copying. Secondly, the flight recorder needs to flush the runtime's buffers twice in a row in order to obtain the snapshot it wants. This is because the signal in the trace that a generation is done is either that the trace stream ends or a batch with a new generation value appears. Flushing twice in a row ensures that the generation we actually wanted done is complete, at the cost of an additional flush. The overhead of this should be minimal in practice, but it does mean that the actual flush operation will have a substantially longer latency than with a runtime-internal implementation. This is OK because that latency doesn't actually affect any properties of the resulting snapshot; it's purely latency to the caller. This problem could have been avoided with an explicit in-band signal that a generation has been flushed, which we may want to consider adding in the future. For #63185. Change-Id: I7a94e2cddcfbf19a4140b398c188c3d59f8b9c9e Reviewed-on: https://go-review.googlesource.com/c/exp/+/550257 Auto-Submit: Michael Knyszek <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Michael Pratt <[email protected]>
1 parent 02704c9 commit be819d1

File tree

2 files changed

+659
-0
lines changed

2 files changed

+659
-0
lines changed

trace/flightrecorder.go

+364
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
// Copyright 2023 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build go1.22
6+
7+
package trace
8+
9+
import (
10+
"bufio"
11+
"encoding/binary"
12+
"fmt"
13+
"io"
14+
"math/bits"
15+
"runtime/trace"
16+
"slices"
17+
"sync"
18+
"time"
19+
_ "unsafe" // for go:linkname
20+
21+
"golang.org/x/exp/trace/internal/event/go122"
22+
)
23+
24+
// FlightRecorder represents a flight recording configuration.
25+
//
26+
// Flight recording holds execution trace data in a circular buffer representing
27+
// the most recent execution data.
28+
//
29+
// Only one flight recording may be active at any given time.
30+
type FlightRecorder struct {
31+
// State for coordinating with the recorder goroutine.
32+
fromTracer *io.PipeReader
33+
toRecorder *io.PipeWriter
34+
recorderWait sync.WaitGroup
35+
err error
36+
37+
// State specific to the recorder goroutine.
38+
header [16]byte
39+
active rawGeneration
40+
ringMu sync.Mutex
41+
ring []rawGeneration
42+
43+
// Externally-set options.
44+
targetSize int
45+
targetPeriod time.Duration
46+
47+
enabled bool // whether the flight recorder is enabled.
48+
writing sync.Mutex // protects concurrent calls to WriteTo
49+
50+
// The values of targetSize and targetPeriod we've committed to since the last Start.
51+
wantSize int
52+
wantDur time.Duration
53+
}
54+
55+
// NewFlightRecorder creates a new flight recording configuration.
56+
func NewFlightRecorder() *FlightRecorder {
57+
return &FlightRecorder{
58+
// These are just some optimistic, reasonable defaults.
59+
//
60+
// In reality we're also bound by whatever the runtime defaults are, because
61+
// we currently have no way to change them.
62+
//
63+
// TODO(mknyszek): Consider adding a function that allows mutating one or
64+
// both of these values' equivalents in the runtime.
65+
targetSize: 10 << 20, // 10 MiB.
66+
targetPeriod: 10 * time.Second,
67+
}
68+
}
69+
70+
// SetPeriod sets the approximate time duration that the flight recorder's circular buffer
71+
// represents.
72+
//
73+
// Note that SetPeriod does not make any guarantees on the amount of time the trace
74+
// produced by WriteTo will represent.
75+
// This is just a hint to the runtime to enable some control the resulting trace.
76+
//
77+
// The initial period is implementation defined, but can be assumed to be on the order
78+
// of seconds.
79+
//
80+
// Adjustments to this value will not apply to an active flight recorder, and will not apply
81+
// if tracing is already enabled via trace.Start. All tracing must be stopped and started
82+
// again to change this value.
83+
func (r *FlightRecorder) SetPeriod(d time.Duration) {
84+
r.targetPeriod = d
85+
}
86+
87+
// SetSize sets the approximate size of the flight recorder's circular buffer.
88+
//
89+
// This generally takes precedence over the duration passed to SetPeriod.
90+
// However, it does not make any guarantees on the size of the data WriteTo will write.
91+
// This is just a hint to the runtime to enable some control over the memory overheads
92+
// of tracing.
93+
//
94+
// The initial size is implementation defined.
95+
//
96+
// Adjustments to this value will not apply to an active flight recorder, and will not apply
97+
// if tracing is already enabled via trace.Start. All tracing must be stopped and started
98+
// again to change this value.
99+
func (r *FlightRecorder) SetSize(bytes int) {
100+
r.targetSize = bytes
101+
}
102+
103+
// Start begins flight recording. Only one flight recorder or one call to [runtime/trace.Start]
104+
// may be active at any given time. Returns an error if starting the flight recorder would
105+
// violate this rule.
106+
func (r *FlightRecorder) Start() error {
107+
if r.enabled {
108+
return fmt.Errorf("cannot enable a enabled flight recorder")
109+
}
110+
111+
r.wantSize = r.targetSize
112+
r.wantDur = r.targetPeriod
113+
r.err = nil
114+
r.fromTracer, r.toRecorder = io.Pipe()
115+
116+
// Start tracing, sending data to the recorder goroutine (not yet started) via an io.Pipe.
117+
if err := trace.Start(r.toRecorder); err != nil {
118+
return err
119+
}
120+
121+
// Start recorder goroutine.
122+
r.recorderWait.Add(1)
123+
go func() {
124+
defer r.recorderWait.Done()
125+
126+
// Read in the header so we can tack it on to the front
127+
// of whatever WriteTo emits later.
128+
_, err := io.ReadFull(r.fromTracer, r.header[:])
129+
if err != nil {
130+
r.err = err
131+
return
132+
}
133+
134+
// Process the rest of the trace.
135+
rd := bufio.NewReader(r.fromTracer)
136+
for {
137+
b, gen, err := readBatch(rd)
138+
if err == io.EOF || err == io.ErrClosedPipe {
139+
break
140+
}
141+
if err != nil {
142+
r.err = err
143+
return
144+
}
145+
146+
// Check if we're entering a new generation.
147+
if r.active.gen != 0 && r.active.gen+1 == gen {
148+
r.ringMu.Lock()
149+
150+
// Validate r.active.freq before we use it. It's required for a generation
151+
// to not be considered broken, and without it, we can't correctly handle
152+
// SetPeriod.
153+
if r.active.freq == 0 {
154+
r.err = fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen)
155+
return
156+
}
157+
158+
// Get the current trace clock time.
159+
now := traceTimeNow(r.active.freq)
160+
161+
// Add the current generation to the ring. Make sure we always have at least one
162+
// complete generation by putting the active generation onto the new list, regardless
163+
// of whatever our settings are.
164+
//
165+
// N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
166+
// and not worry about aliasing. This creates allocations, but at a very low rate.
167+
newRing := []rawGeneration{r.active}
168+
size := r.active.size
169+
for i := len(r.ring) - 1; i >= 0; i-- {
170+
// Stop adding older generations if the new ring already exceeds the thresholds.
171+
// This ensures we keep generations that cross a threshold, but not any that lie
172+
// entirely outside it.
173+
if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur {
174+
break
175+
}
176+
size += r.ring[i].size
177+
newRing = append(newRing, r.ring[i])
178+
}
179+
slices.Reverse(newRing)
180+
r.ring = newRing
181+
r.ringMu.Unlock()
182+
183+
// Start a new active generation.
184+
r.active = rawGeneration{}
185+
}
186+
187+
// Obtain the frequency if this is a frequency batch.
188+
if b.isFreqBatch() {
189+
freq, err := parseFreq(b)
190+
if err != nil {
191+
r.err = err
192+
return
193+
}
194+
r.active.freq = freq
195+
}
196+
197+
// Append the batch to the current generation.
198+
if r.active.gen == 0 {
199+
r.active.gen = gen
200+
}
201+
if r.active.minTime == 0 || r.active.minTime > b.time {
202+
r.active.minTime = b.time
203+
}
204+
r.active.size += 1
205+
r.active.size += uvarintSize(gen)
206+
r.active.size += uvarintSize(uint64(b.m))
207+
r.active.size += uvarintSize(uint64(b.time))
208+
r.active.size += uvarintSize(uint64(len(b.data)))
209+
r.active.size += len(b.data)
210+
r.active.batches = append(r.active.batches, b)
211+
}
212+
}()
213+
214+
r.enabled = true
215+
return nil
216+
}
217+
218+
// Stop ends flight recording. It waits until any concurrent [FlightRecorder.WriteTo] calls exit.
219+
// Returns an error if the flight recorder is inactive.
220+
func (r *FlightRecorder) Stop() error {
221+
if !r.enabled {
222+
return fmt.Errorf("cannot disable a disabled flight recorder")
223+
}
224+
r.enabled = false
225+
trace.Stop()
226+
227+
// Close the write side of the pipe. This is safe because tracing has stopped, so no more will
228+
// be written to the pipe.
229+
r.fromTracer.Close()
230+
231+
// Wait for the reader to exit.
232+
r.recorderWait.Wait()
233+
234+
// Reset all state. No need to lock because the reader has already exited.
235+
r.active = rawGeneration{}
236+
r.ring = nil
237+
r.toRecorder.Close()
238+
r.fromTracer.Close()
239+
return r.err
240+
}
241+
242+
// Enabled returns true if the flight recorder is active. Specifically, it will return true if
243+
// Start did not return an error, and Stop has not yet been called.
244+
// It is safe to call from multiple goroutines simultaneously.
245+
func (r *FlightRecorder) Enabled() bool {
246+
return r.enabled
247+
}
248+
249+
// ErrSnapshotActive indicates that a call to WriteTo was made while one was already in progress.
250+
// If the caller of WriteTo sees this error, they should use the result from the other call to WriteTo.
251+
var ErrSnapshotActive = fmt.Errorf("call to WriteTo for trace.FlightRecorder already in progress")
252+
253+
// WriteTo takes a snapshots of the circular buffer's contents and writes the execution data to w.
254+
// Returns the number of bytes written and an error.
255+
// An error is returned upon failure to write to w or if the flight recorder is inactive.
256+
// Only one goroutine may execute WriteTo at a time, but it is safe to call from multiple goroutines.
257+
// If a goroutine calls WriteTo while another goroutine is currently executing it, WriteTo will return
258+
// ErrSnapshotActive to that goroutine.
259+
func (r *FlightRecorder) WriteTo(w io.Writer) (total int, err error) {
260+
if !r.enabled {
261+
return 0, fmt.Errorf("cannot snapshot a disabled flight recorder")
262+
}
263+
if !r.writing.TryLock() {
264+
return 0, ErrSnapshotActive
265+
}
266+
defer r.writing.Unlock()
267+
268+
// Force a global buffer flush twice.
269+
//
270+
// This is pretty unfortunate, but because the signal that a generation is done is that a new
271+
// generation appears in the trace *or* the trace stream ends, the recorder goroutine will
272+
// have no idea when to add a generation to the ring if we just flush once. If we flush twice,
273+
// at least the first one will end up on the ring, which is the one we wanted anyway.
274+
//
275+
// In a runtime-internal implementation this is a non-issue. The runtime is fully aware
276+
// of what generations are complete, so only one flush is necessary.
277+
runtime_traceAdvance(false)
278+
runtime_traceAdvance(false)
279+
280+
// Now that everything has been flushed and written, grab whatever we have.
281+
//
282+
// N.B. traceAdvance blocks until the tracer goroutine has actually written everything
283+
// out, which means the generation we just flushed must have been already been observed
284+
// by the recorder goroutine. Because we flushed twice, the first flush is guaranteed to
285+
// have been both completed *and* processed by the recorder goroutine.
286+
r.ringMu.Lock()
287+
gens := r.ring
288+
r.ringMu.Unlock()
289+
290+
// Write the header.
291+
total, err = w.Write(r.header[:])
292+
if err != nil {
293+
return total, err
294+
}
295+
296+
// Helper for writing varints.
297+
var varintBuf [binary.MaxVarintLen64]byte
298+
writeUvarint := func(u uint64) error {
299+
v := binary.PutUvarint(varintBuf[:], u)
300+
n, err := w.Write(varintBuf[:v])
301+
total += n
302+
return err
303+
}
304+
305+
// Write all the data.
306+
for _, gen := range gens {
307+
for _, batch := range gen.batches {
308+
// Rewrite the batch header event with four arguments: gen, M ID, timestamp, and data length.
309+
n, err := w.Write([]byte{byte(go122.EvEventBatch)})
310+
total += n
311+
if err != nil {
312+
return total, err
313+
}
314+
if err := writeUvarint(gen.gen); err != nil {
315+
return total, err
316+
}
317+
if err := writeUvarint(uint64(batch.m)); err != nil {
318+
return total, err
319+
}
320+
if err := writeUvarint(uint64(batch.time)); err != nil {
321+
return total, err
322+
}
323+
if err := writeUvarint(uint64(len(batch.data))); err != nil {
324+
return total, err
325+
}
326+
327+
// Write batch data.
328+
n, err = w.Write(batch.data)
329+
total += n
330+
if err != nil {
331+
return total, err
332+
}
333+
}
334+
}
335+
return total, nil
336+
}
337+
338+
type rawGeneration struct {
339+
gen uint64
340+
size int
341+
minTime timestamp
342+
freq frequency
343+
batches []batch
344+
}
345+
346+
func (r *rawGeneration) minTraceTime() Time {
347+
return r.freq.mul(r.minTime)
348+
}
349+
350+
func traceTimeNow(freq frequency) Time {
351+
// TODO(mknyszek): It's unfortunate that we have to rely on runtime-internal details
352+
// like this. This would be better off in the runtime.
353+
return freq.mul(timestamp(runtime_traceClockNow()))
354+
}
355+
356+
func uvarintSize(x uint64) int {
357+
return 1 + bits.Len64(x)/7
358+
}
359+
360+
//go:linkname runtime_traceAdvance runtime.traceAdvance
361+
func runtime_traceAdvance(stopTrace bool)
362+
363+
//go:linkname runtime_traceClockNow runtime.traceClockNow
364+
func runtime_traceClockNow() int64

0 commit comments

Comments
 (0)