Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce api/sessionrecording's Reader API and misc. improvements #50883

Open
wants to merge 24 commits into
base: dustin.specker/move-proto-reader-to-api
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7202af8
refactor(api/sessionrecording): implement slog.LogValuer for ReaderStats
dustinspecker Dec 23, 2024
cb0aae4
docs(api/sessionrecording): correct godoc for gzipReader
dustinspecker Dec 31, 2024
7954041
docs(api/sessionrecording): remove useless Close godoc
dustinspecker Dec 31, 2024
8e23da9
refactor(api/sessionrecording): remove useless iota values
dustinspecker Dec 31, 2024
dc9e844
docs(api/sessionrecording): fix godoc for LogValue
dustinspecker Dec 31, 2024
3dd0bd4
refactor(api/sessionrecording): move sizeBytes and messageBytes off o…
dustinspecker Dec 31, 2024
ed211bd
feat(api/sessionrecording): track skipped bytes on ReaderStats
dustinspecker Dec 31, 2024
4659417
refactor(api/sessionrecording): newGzipReader takes io.Reader
dustinspecker Dec 31, 2024
c5089c8
refactor(api/sessionrecording): remove gzipReader
dustinspecker Dec 31, 2024
ade725f
docs(sessionrecording): clarify message about Close on NewReader
dustinspecker Jan 2, 2025
dd244f4
refactor(lib/events): newGzipWriter takes io.Writer
dustinspecker Jan 2, 2025
ac513e8
refactor(api/sessionrecording): use better names for Reader fields
dustinspecker Jan 2, 2025
9b96888
docs(api/sessionrecording): add godoc for Reader fields
dustinspecker Jan 2, 2025
36ea143
refactor(api/sessionrecording): remove Reader.Reset
dustinspecker Jan 2, 2025
cb2371e
refactor(lib/events/filesessions): use slices.CompactFunc
dustinspecker Jan 2, 2025
a61e117
fix(lib/events/filesessions): replace int64 conversion with comparison
dustinspecker Jan 2, 2025
11d6eab
refactor(lib/events/filesessions): use SortFunc instead of SortStable…
dustinspecker Jan 2, 2025
e5f508d
test(lib/events): combine uploaded parts to be read by single Reader
dustinspecker Jan 8, 2025
41b2085
fix(api/sessionrecording): use underscores in slog key names
dustinspecker Jan 8, 2025
01ce51f
refactor(api/sessionrecording): scope checkpointIteration to for loop
dustinspecker Jan 8, 2025
f3a81f1
refactor(api/sessionrecording): reduce scope of err vars
dustinspecker Jan 8, 2025
09b0884
refactor(api/sessionrecording): avoid creating empty struct
dustinspecker Jan 8, 2025
4ca0689
refactor(api/sessionrecording): move size and message bytes to Reader…
dustinspecker Jan 8, 2025
e93d2c7
docs(lib/events): remove untrue comment
dustinspecker Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 0 additions & 62 deletions api/sessionrecording/gzip_reader.go

This file was deleted.

117 changes: 60 additions & 57 deletions api/sessionrecording/session_recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package sessionrecording

import (
"compress/gzip"
"context"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -52,41 +53,61 @@ const (
)

// NewReader returns a new session recording reader
//
// It is the caller's responsibility to call Close on the returned [Reader] from NewReader when done
// with the [Reader].
func NewReader(r io.Reader) *Reader {
return &Reader{
reader: r,
rawReader: r,
lastIndex: -1,
}
}

const (
// protoReaderStateInit is ready to start reading the next part
protoReaderStateInit = 0
protoReaderStateInit = iota
// protoReaderStateCurrent will read the data from the current part
protoReaderStateCurrent = iota
protoReaderStateCurrent
// protoReaderStateEOF indicates that reader has completed reading
// all parts
protoReaderStateEOF = iota
protoReaderStateEOF
// protoReaderStateError indicates that reader has reached internal
// error and should close
protoReaderStateError = iota
protoReaderStateError
)

// Reader reads Teleport's session recordings
type Reader struct {
gzipReader *gzipReader
padding int64
reader io.Reader
sizeBytes [Int64Size]byte
// partReader wraps rawReader and is limited to reading a single
// (compressed) part from the session recording
partReader io.Reader
// gzipReader wraps partReader and decompresses a single part
// from the session recording
gzipReader *gzip.Reader
// padding is how many bytes were added to hit a minimum file upload size
padding int64
// rawReader is the raw data source we read from
rawReader io.Reader
// sizeBytes is used to hold the header of the current event being parsed
sizeBytes [Int64Size]byte
// messageBytes holds the current decompressed event being parsed
messageBytes [MaxProtoMessageSizeBytes]byte
state int
error error
lastIndex int64
stats ReaderStats
// state tracks where the Reader is at in consuming a session recording
state int
// error holds any error encountered while reading a session recording
error error
// lastIndex stores the last parsed event's index within a session (events found with an index less than or equal to lastIndex are skipped)
lastIndex int64
// stats contains info about processed events (e.g. total events processed, how many events were skipped)
stats ReaderStats
}

// ReaderStats contains some reader statistics
type ReaderStats struct {
// SkippedBytes is a counter with encountered bytes that have been skipped for processing.
// Typically occurring due to a bug in older Teleport versions having padding bytes
// written to the gzip section.
SkippedBytes int64
// SkippedEvents is a counter with encountered
// events recorded several times or events
// that have been out of order as skipped
Expand All @@ -99,13 +120,14 @@ type ReaderStats struct {
TotalEvents int64
}

// ToFields returns a copy of the stats to be used as log fields
func (p ReaderStats) ToFields() map[string]any {
return map[string]any{
"skipped-events": p.SkippedEvents,
"out-of-order-events": p.OutOfOrderEvents,
"total-events": p.TotalEvents,
}
// LogValue returns a copy of the stats to be used as log fields
func (p ReaderStats) LogValue() slog.Value {
return slog.GroupValue(
slog.Int64("skipped_bytes", p.SkippedBytes),
slog.Int64("skipped_events", p.SkippedEvents),
slog.Int64("out_of_order_events", p.OutOfOrderEvents),
slog.Int64("total_events", p.TotalEvents),
)
}

// Close releases reader resources
Copy link
Author

@dustinspecker dustinspecker Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rosstimothy brought up the following on the original pull request.

"Is is possible that gzipReader is assigned after the check on line 129? For instance what if Read is being executed in another goroutine?"

Expand All @@ -116,24 +138,6 @@ func (r *Reader) Close() error {
return nil
}

// Reset sets reader to read from the new reader
// without resetting the stats, could be used
// to deduplicate the events
func (r *Reader) Reset(reader io.Reader) error {
if r.error != nil {
return r.error
}
if r.gzipReader != nil {
if r.error = r.gzipReader.Close(); r.error != nil {
return trace.Wrap(r.error)
}
r.gzipReader = nil
}
r.reader = reader
r.state = protoReaderStateInit
return nil
}

func (r *Reader) setError(err error) error {
r.state = protoReaderStateError
r.error = err
Expand All @@ -151,9 +155,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) {
// is an extra precaution to avoid
// accidental endless loop due to logic error crashing the system
// and allows ctx timeout to kick in if specified
var checkpointIteration int64
for {
checkpointIteration++
for checkpointIteration := int64(1); ; checkpointIteration++ {
if checkpointIteration%maxIterationLimit == 0 {
select {
case <-ctx.Done():
Expand All @@ -172,8 +174,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) {
case protoReaderStateInit:
// read the part header that consists of the protocol version
// and the part size (for the V1 version of the protocol)
_, err := io.ReadFull(r.reader, r.sizeBytes[:Int64Size])
if err != nil {
if _, err := io.ReadFull(r.rawReader, r.sizeBytes[:Int64Size]); err != nil {
// reached the end of the stream
if errors.Is(err, io.EOF) {
r.state = protoReaderStateEOF
Expand All @@ -186,43 +187,47 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) {
return nil, trace.BadParameter("unsupported protocol version %v", protocolVersion)
}
// read size of this gzipped part as encoded by V1 protocol version
_, err = io.ReadFull(r.reader, r.sizeBytes[:Int64Size])
if err != nil {
if _, err := io.ReadFull(r.rawReader, r.sizeBytes[:Int64Size]); err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}
partSize := binary.BigEndian.Uint64(r.sizeBytes[:Int64Size])
// read padding size (could be 0)
_, err = io.ReadFull(r.reader, r.sizeBytes[:Int64Size])
if err != nil {
if _, err := io.ReadFull(r.rawReader, r.sizeBytes[:Int64Size]); err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}
r.padding = int64(binary.BigEndian.Uint64(r.sizeBytes[:Int64Size]))
gzipReader, err := newGzipReader(io.NopCloser(io.LimitReader(r.reader, int64(partSize))))
r.partReader = io.LimitReader(r.rawReader, int64(partSize))
gzipReader, err := gzip.NewReader(r.partReader)
// older bugged versions of teleport would sometimes incorrectly inject padding bytes into
// the gzip section of the archive. this causes gzip readers with multistream enabled (the
// default behavior) to fail. we disable multistream here in order to ensure that the gzip
// reader halts when it reaches the end of the current (only) valid gzip entry.
if err != nil {
return nil, r.setError(trace.Wrap(err))
}
gzipReader.Multistream(false)
r.gzipReader = gzipReader
r.state = protoReaderStateCurrent
continue
// read the next version from the gzip reader
case protoReaderStateCurrent:
// the record consists of length of the protobuf encoded
// message and the message itself
_, err := io.ReadFull(r.gzipReader, r.sizeBytes[:Int32Size])
if err != nil {
if _, err := io.ReadFull(r.gzipReader, r.sizeBytes[:Int32Size]); err != nil {
if !errors.Is(err, io.EOF) {
return nil, r.setError(trace.ConvertSystemError(err))
}

// due to a bug in older versions of teleport it was possible that padding
// bytes would end up inside of the gzip section of the archive. we should
// skip any dangling data in the gzip secion.
n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, r.messageBytes[:])
n, err := io.CopyBuffer(io.Discard, r.partReader, r.messageBytes[:])
if err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}

if n != 0 {
r.stats.SkippedBytes += n
// log the number of bytes that were skipped
slog.DebugContext(ctx, "skipped dangling data in session recording section", "length", n)
}
Expand All @@ -233,7 +238,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) {
return nil, r.setError(trace.ConvertSystemError(err))
}
if r.padding != 0 {
skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.reader, r.padding), r.messageBytes[:])
skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.rawReader, r.padding), r.messageBytes[:])
if err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}
Expand All @@ -255,13 +260,11 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) {
if messageSize == 0 {
return nil, r.setError(trace.BadParameter("unexpected message size 0"))
}
_, err = io.ReadFull(r.gzipReader, r.messageBytes[:messageSize])
if err != nil {
if _, err := io.ReadFull(r.gzipReader, r.messageBytes[:messageSize]); err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}
oneof := apievents.OneOf{}
err = oneof.Unmarshal(r.messageBytes[:messageSize])
if err != nil {
var oneof apievents.OneOf
if err := oneof.Unmarshal(r.messageBytes[:messageSize]); err != nil {
return nil, trace.Wrap(err)
}
event, err := apievents.FromOneOf(oneof)
Expand Down
32 changes: 19 additions & 13 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,25 +660,31 @@ func emitStream(ctx context.Context, t *testing.T, streamer events.Streamer, inE
}

// readStream reads and decodes the audit stream from uploadID
// Additionally, readStream will remove any duplicate audit events (by index) encountered.
func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *eventstest.MemoryUploader) []apievents.AuditEvent {
t.Helper()

parts, err := uploader.GetParts(uploadID)
require.NoError(t, err)

var outEvents []apievents.AuditEvent
var reader *sessionrecording.Reader
for i, part := range parts {
if i == 0 {
reader = sessionrecording.NewReader(bytes.NewReader(part))
} else {
err := reader.Reset(bytes.NewReader(part))
require.NoError(t, err)
}
out, err := reader.ReadAll(ctx)
require.NoError(t, err, "part crash %#v", part)

outEvents = append(outEvents, out...)
// combine all uploaded parts to create the session recording content
var sessionRecordingContent bytes.Buffer
for _, part := range parts {
bytesWritten, err := sessionRecordingContent.Write(part)
require.NoError(t, err, "error writing part bytes to session recording content")
require.Equal(t, len(part), bytesWritten, "not all bytes were written to session recording content")
}

// Note: it is possible for duplicate event indices to be encountered in cases where the upload process
// encounters an error such as the connection being termianted, since the upload process will retry uploading
// those events for a successful upload. This is not an issue because session recording reader knows to drop
// events found with an event index already read.
reader := sessionrecording.NewReader(&sessionRecordingContent)

outEvents, err := reader.ReadAll(ctx)
require.NoError(t, err, "error reading all session recording content")

require.NoError(t, reader.Close(), "error closing session recording reader")

return outEvents
}
2 changes: 1 addition & 1 deletion lib/events/session_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (a *sessionWriterTest) collectEvents(t *testing.T) []apievents.AuditEvent {
reader := sessionrecording.NewReader(io.MultiReader(readers...))
outEvents, err := reader.ReadAll(a.ctx)
require.NoError(t, err, "failed to read")
t.Logf("Reader stats :%v", reader.GetStats().ToFields())
t.Logf("Reader stats :%v", reader.GetStats().LogValue())

return outEvents
}
Expand Down
13 changes: 5 additions & 8 deletions lib/events/sessionlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"github.com/gravitational/trace"
)

// gzipWriter wraps file, on close close both gzip writer and file
// gzipWriter wraps io.Writer and gzip.Writer, on close closes the gzip writer
type gzipWriter struct {
*gzip.Writer
inner io.WriteCloser
inner io.Writer
}

// Close closes gzip writer and file
// Close closes gzip writer
func (f *gzipWriter) Close() error {
var errors []error
if f.Writer != nil {
Expand All @@ -41,10 +41,7 @@ func (f *gzipWriter) Close() error {
writerPool.Put(f.Writer)
f.Writer = nil
}
if f.inner != nil {
errors = append(errors, f.inner.Close())
f.inner = nil
}
f.inner = nil
return trace.NewAggregate(errors...)
}

Expand All @@ -59,7 +56,7 @@ var writerPool = sync.Pool{
},
}

func newGzipWriter(writer io.WriteCloser) *gzipWriter {
func newGzipWriter(writer io.Writer) *gzipWriter {
g := writerPool.Get().(*gzip.Writer)
g.Reset(writer)
return &gzipWriter{
Expand Down
Loading
Loading