From 7202af8a290d043a6bea542164d32c1a9ac42800 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Mon, 23 Dec 2024 09:58:24 -0600 Subject: [PATCH 01/24] refactor(api/sessionrecording): implement slog.LogValuer for ReaderStats --- api/sessionrecording/session_recording.go | 12 ++++++------ lib/events/session_writer_test.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 1f452cc119674..cedb97b910e0d 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -100,12 +100,12 @@ type ReaderStats struct { } // ToFields returns a copy of the stats to be used as log fields -func (p ReaderStats) ToFields() map[string]any { - return map[string]any{ - "skipped-events": p.SkippedEvents, - "out-of-order-events": p.OutOfOrderEvents, - "total-events": p.TotalEvents, - } +func (p ReaderStats) LogValue() slog.Value { + return slog.GroupValue( + slog.Int64("skipped-events", p.SkippedEvents), + slog.Int64("out-of-order-events", p.OutOfOrderEvents), + slog.Int64("total-events", p.TotalEvents), + ) } // Close releases reader resources diff --git a/lib/events/session_writer_test.go b/lib/events/session_writer_test.go index b6466b65af5e9..5576485c2e743 100644 --- a/lib/events/session_writer_test.go +++ b/lib/events/session_writer_test.go @@ -424,7 +424,7 @@ func (a *sessionWriterTest) collectEvents(t *testing.T) []apievents.AuditEvent { reader := sessionrecording.NewReader(io.MultiReader(readers...)) outEvents, err := reader.ReadAll(a.ctx) require.NoError(t, err, "failed to read") - t.Logf("Reader stats :%v", reader.GetStats().ToFields()) + t.Logf("Reader stats :%v", reader.GetStats().LogValue()) return outEvents } From cb0aae41888a0fb4c4bbe18d89eba69e9b466ccb Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 14:20:56 -0600 Subject: [PATCH 02/24] docs(api/sessionrecording): correct godoc for gzipReader --- api/sessionrecording/gzip_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/sessionrecording/gzip_reader.go b/api/sessionrecording/gzip_reader.go index 79c71e49f8fc6..5992c5d420cb7 100644 --- a/api/sessionrecording/gzip_reader.go +++ b/api/sessionrecording/gzip_reader.go @@ -25,7 +25,7 @@ import ( "github.com/gravitational/trace" ) -// gzipReader wraps file, on close close both gzip writer and file +// gzipReader wraps an [io.ReadCloser], and closes both the gzip reader and underlying reader type gzipReader struct { io.ReadCloser inner io.ReadCloser From 79540416cb2bb01ea9e0f8a3b2e2d77a2e192654 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 14:21:31 -0600 Subject: [PATCH 03/24] docs(api/sessionrecording): remove useless Close godoc --- api/sessionrecording/gzip_reader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/api/sessionrecording/gzip_reader.go b/api/sessionrecording/gzip_reader.go index 5992c5d420cb7..f3ef1481a61ec 100644 --- a/api/sessionrecording/gzip_reader.go +++ b/api/sessionrecording/gzip_reader.go @@ -31,7 +31,6 @@ type gzipReader struct { inner io.ReadCloser } -// Close closes file and gzip writer func (f *gzipReader) Close() error { var errors []error if f.ReadCloser != nil { From 8e23da9fbb5f3426cbed496a664d9d6bfbf4e5f8 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 14:26:53 -0600 Subject: [PATCH 04/24] refactor(api/sessionrecording): remove useless iota values --- api/sessionrecording/session_recording.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index cedb97b910e0d..c5a6023385a09 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -61,15 +61,15 @@ func NewReader(r io.Reader) *Reader { const ( // protoReaderStateInit is ready to start reading the next part - protoReaderStateInit = 0 + protoReaderStateInit = iota // protoReaderStateCurrent will read the data from the current part - protoReaderStateCurrent = iota + protoReaderStateCurrent // protoReaderStateEOF indicates that reader has completed reading // all parts - protoReaderStateEOF = iota + protoReaderStateEOF // protoReaderStateError indicates that reader has reached internal // error and should close - protoReaderStateError = iota + protoReaderStateError ) // Reader reads Teleport's session recordings From dc9e84404011ce8fa2a20f3ce52f3a714cc2601d Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 14:37:29 -0600 Subject: [PATCH 05/24] docs(api/sessionrecording): fix godoc for LogValue --- api/sessionrecording/session_recording.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index c5a6023385a09..65d5a26eb852f 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -99,7 +99,7 @@ type ReaderStats struct { TotalEvents int64 } -// ToFields returns a copy of the stats to be used as log fields +// LogValue returns a copy of the stats to be used as log fields func (p ReaderStats) LogValue() slog.Value { return slog.GroupValue( slog.Int64("skipped-events", p.SkippedEvents), From 3dd0bd41c322f019791e63183ff243f7bfb3b0fd Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 14:45:29 -0600 Subject: [PATCH 06/24] refactor(api/sessionrecording): move sizeBytes and messageBytes off of Reader --- api/sessionrecording/session_recording.go | 44 ++++++++++++----------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 65d5a26eb852f..34482b07b8d23 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -74,15 +74,13 @@ const ( // Reader reads Teleport's session recordings type Reader struct { - gzipReader *gzipReader - padding int64 - reader io.Reader - sizeBytes [Int64Size]byte - messageBytes [MaxProtoMessageSizeBytes]byte - state int - error error - lastIndex int64 - stats ReaderStats + gzipReader *gzipReader + padding int64 + reader io.Reader + state int + error error + lastIndex int64 + stats ReaderStats } // ReaderStats contains some reader statistics @@ -147,6 +145,8 @@ func (r *Reader) GetStats() ReaderStats { // Read returns next event or io.EOF in case of the end of the parts func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { + var sizeBytes [Int64Size]byte + // periodic checks of context after fixed amount of iterations // is an extra precaution to avoid // accidental endless loop due to logic error crashing the system @@ -172,7 +172,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { case protoReaderStateInit: // read the part header that consists of the protocol version // and the part size (for the V1 version of the protocol) - _, err := io.ReadFull(r.reader, r.sizeBytes[:Int64Size]) + _, err := io.ReadFull(r.reader, sizeBytes[:Int64Size]) if err != nil { // reached the end of the stream if errors.Is(err, io.EOF) { @@ -181,22 +181,22 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { } return nil, r.setError(trace.ConvertSystemError(err)) } - protocolVersion := binary.BigEndian.Uint64(r.sizeBytes[:Int64Size]) + protocolVersion := binary.BigEndian.Uint64(sizeBytes[:Int64Size]) if protocolVersion != ProtoStreamV1 { return nil, trace.BadParameter("unsupported protocol version %v", protocolVersion) } // read size of this gzipped part as encoded by V1 protocol version - _, err = io.ReadFull(r.reader, r.sizeBytes[:Int64Size]) + _, err = io.ReadFull(r.reader, sizeBytes[:Int64Size]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } - partSize := binary.BigEndian.Uint64(r.sizeBytes[:Int64Size]) + partSize := binary.BigEndian.Uint64(sizeBytes[:Int64Size]) // read padding size (could be 0) - _, err = io.ReadFull(r.reader, r.sizeBytes[:Int64Size]) + _, err = io.ReadFull(r.reader, sizeBytes[:Int64Size]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } - r.padding = int64(binary.BigEndian.Uint64(r.sizeBytes[:Int64Size])) + r.padding = int64(binary.BigEndian.Uint64(sizeBytes[:Int64Size])) gzipReader, err := newGzipReader(io.NopCloser(io.LimitReader(r.reader, int64(partSize)))) if err != nil { return nil, r.setError(trace.Wrap(err)) @@ -206,9 +206,11 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { continue // read the next version from the gzip reader case protoReaderStateCurrent: + var messageBytes [MaxProtoMessageSizeBytes]byte + // the record consists of length of the protobuf encoded // message and the message itself - _, err := io.ReadFull(r.gzipReader, r.sizeBytes[:Int32Size]) + _, err := io.ReadFull(r.gzipReader, sizeBytes[:Int32Size]) if err != nil { if !errors.Is(err, io.EOF) { return nil, r.setError(trace.ConvertSystemError(err)) @@ -217,7 +219,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { // due to a bug in older versions of teleport it was possible that padding // bytes would end up inside of the gzip section of the archive. we should // skip any dangling data in the gzip secion. - n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, r.messageBytes[:]) + n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -233,7 +235,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, r.setError(trace.ConvertSystemError(err)) } if r.padding != 0 { - skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.reader, r.padding), r.messageBytes[:]) + skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.reader, r.padding), messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -247,7 +249,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { r.state = protoReaderStateInit continue } - messageSize := binary.BigEndian.Uint32(r.sizeBytes[:Int32Size]) + messageSize := binary.BigEndian.Uint32(sizeBytes[:Int32Size]) // zero message size indicates end of the part // that sometimes is present in partially submitted parts // that have to be filled with zeroes for parts smaller @@ -255,12 +257,12 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { if messageSize == 0 { return nil, r.setError(trace.BadParameter("unexpected message size 0")) } - _, err = io.ReadFull(r.gzipReader, r.messageBytes[:messageSize]) + _, err = io.ReadFull(r.gzipReader, messageBytes[:messageSize]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } oneof := apievents.OneOf{} - err = oneof.Unmarshal(r.messageBytes[:messageSize]) + err = oneof.Unmarshal(messageBytes[:messageSize]) if err != nil { return nil, trace.Wrap(err) } From ed211bdad5d0ec1fa658e3eb18c7c53306ed5266 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 14:58:26 -0600 Subject: [PATCH 07/24] feat(api/sessionrecording): track skipped bytes on ReaderStats --- api/sessionrecording/session_recording.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 34482b07b8d23..8d5d8a9831ed3 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -85,6 +85,10 @@ type Reader struct { // ReaderStats contains some reader statistics type ReaderStats struct { + // SkippedBytes is a counter with encountered bytes that have been skipped for processing. + // Typically occurring due to a bug in older Teleport versions having padding bytes + // written to the gzip section. + SkippedBytes int64 // SkippedEvents is a counter with encountered // events recorded several times or events // that have been out of order as skipped @@ -100,6 +104,7 @@ type ReaderStats struct { // LogValue returns a copy of the stats to be used as log fields func (p ReaderStats) LogValue() slog.Value { return slog.GroupValue( + slog.Int64("skipped-bytes", p.SkippedBytes), slog.Int64("skipped-events", p.SkippedEvents), slog.Int64("out-of-order-events", p.OutOfOrderEvents), slog.Int64("total-events", p.TotalEvents), @@ -225,6 +230,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { } if n != 0 { + r.stats.SkippedBytes += n // log the number of bytes that were skipped slog.DebugContext(ctx, "skipped dangling data in session recording section", "length", n) } From 465941747a303f1e1e218b8902d36b9536ebafef Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 16:30:12 -0600 Subject: [PATCH 08/24] refactor(api/sessionrecording): newGzipReader takes io.Reader Before, this was taking an io.ReadCloser, but newGzipReader was always being used by sessionrecording with io.NopCloser. So, just take an io.Reader. --- api/sessionrecording/gzip_reader.go | 9 +++------ api/sessionrecording/session_recording.go | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/api/sessionrecording/gzip_reader.go b/api/sessionrecording/gzip_reader.go index f3ef1481a61ec..0eea861fe221b 100644 --- a/api/sessionrecording/gzip_reader.go +++ b/api/sessionrecording/gzip_reader.go @@ -28,7 +28,7 @@ import ( // gzipReader wraps an [io.ReadCloser], and closes both the gzip reader and underlying reader type gzipReader struct { io.ReadCloser - inner io.ReadCloser + inner io.Reader } func (f *gzipReader) Close() error { @@ -37,14 +37,11 @@ func (f *gzipReader) Close() error { errors = append(errors, f.ReadCloser.Close()) f.ReadCloser = nil } - if f.inner != nil { - errors = append(errors, f.inner.Close()) - f.inner = nil - } + f.inner = nil return trace.NewAggregate(errors...) } -func newGzipReader(reader io.ReadCloser) (*gzipReader, error) { +func newGzipReader(reader io.Reader) (*gzipReader, error) { gzReader, err := gzip.NewReader(reader) if err != nil { return nil, trace.Wrap(err) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 8d5d8a9831ed3..562fb768976f9 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -202,7 +202,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, r.setError(trace.ConvertSystemError(err)) } r.padding = int64(binary.BigEndian.Uint64(sizeBytes[:Int64Size])) - gzipReader, err := newGzipReader(io.NopCloser(io.LimitReader(r.reader, int64(partSize)))) + gzipReader, err := newGzipReader(io.LimitReader(r.reader, int64(partSize))) if err != nil { return nil, r.setError(trace.Wrap(err)) } From c5089c846744e7009813fc98600e48ef79645a92 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Tue, 31 Dec 2024 16:56:52 -0600 Subject: [PATCH 09/24] refactor(api/sessionrecording): remove gzipReader After accepting only a io.Reader, gzipReader doesn't make much sense anymore. Instead, update sessionrecording.Reader to construct a gzip.Reader with Multistream disabled. --- api/sessionrecording/gzip_reader.go | 58 ----------------------- api/sessionrecording/session_recording.go | 16 +++++-- 2 files changed, 13 insertions(+), 61 deletions(-) delete mode 100644 api/sessionrecording/gzip_reader.go diff --git a/api/sessionrecording/gzip_reader.go b/api/sessionrecording/gzip_reader.go deleted file mode 100644 index 0eea861fe221b..0000000000000 --- a/api/sessionrecording/gzip_reader.go +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Teleport - * Copyright (C) 2025 Gravitational, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package sessionrecording - -import ( - "compress/gzip" - "io" - - "github.com/gravitational/trace" -) - -// gzipReader wraps an [io.ReadCloser], and closes both the gzip reader and underlying reader -type gzipReader struct { - io.ReadCloser - inner io.Reader -} - -func (f *gzipReader) Close() error { - var errors []error - if f.ReadCloser != nil { - errors = append(errors, f.ReadCloser.Close()) - f.ReadCloser = nil - } - f.inner = nil - return trace.NewAggregate(errors...) -} - -func newGzipReader(reader io.Reader) (*gzipReader, error) { - gzReader, err := gzip.NewReader(reader) - if err != nil { - return nil, trace.Wrap(err) - } - // older bugged versions of teleport would sometimes incorrectly inject padding bytes into - // the gzip section of the archive. this causes gzip readers with multistream enabled (the - // default behavior) to fail. we disable multistream here in order to ensure that the gzip - // reader halts when it reaches the end of the current (only) valid gzip entry. - gzReader.Multistream(false) - return &gzipReader{ - ReadCloser: gzReader, - inner: reader, - }, nil -} diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 562fb768976f9..10d66e5da9b9c 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -21,6 +21,7 @@ package sessionrecording import ( + "compress/gzip" "context" "encoding/binary" "errors" @@ -52,6 +53,8 @@ const ( ) // NewReader returns a new session recording reader +// +// It is the caller's responsibility to call Close on the [Reader] when done. func NewReader(r io.Reader) *Reader { return &Reader{ reader: r, @@ -74,7 +77,8 @@ const ( // Reader reads Teleport's session recordings type Reader struct { - gzipReader *gzipReader + inner io.Reader + gzipReader *gzip.Reader padding int64 reader io.Reader state int @@ -202,10 +206,16 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, r.setError(trace.ConvertSystemError(err)) } r.padding = int64(binary.BigEndian.Uint64(sizeBytes[:Int64Size])) - gzipReader, err := newGzipReader(io.LimitReader(r.reader, int64(partSize))) + r.inner = io.LimitReader(r.reader, int64(partSize)) + gzipReader, err := gzip.NewReader(r.inner) + // older bugged versions of teleport would sometimes incorrectly inject padding bytes into + // the gzip section of the archive. this causes gzip readers with multistream enabled (the + // default behavior) to fail. we disable multistream here in order to ensure that the gzip + // reader halts when it reaches the end of the current (only) valid gzip entry. if err != nil { return nil, r.setError(trace.Wrap(err)) } + gzipReader.Multistream(false) r.gzipReader = gzipReader r.state = protoReaderStateCurrent continue @@ -224,7 +234,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { // due to a bug in older versions of teleport it was possible that padding // bytes would end up inside of the gzip section of the archive. we should // skip any dangling data in the gzip secion. - n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, messageBytes[:]) + n, err := io.CopyBuffer(io.Discard, r.inner, messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } From ade725f294d80d8a0310c3fbaa84d075284ff070 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 09:28:32 -0600 Subject: [PATCH 10/24] docs(sessionrecording): clarify message about Close on NewReader --- api/sessionrecording/session_recording.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 10d66e5da9b9c..59e9411be4541 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -54,7 +54,8 @@ const ( // NewReader returns a new session recording reader // -// It is the caller's responsibility to call Close on the [Reader] when done. +// It is the caller's responsibility to call Close on the returned [Reader] from NewReader when done +// with the [Reader]. func NewReader(r io.Reader) *Reader { return &Reader{ reader: r, From dd244f42db03172f0da8fb075e91670a8900f2d4 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 10:05:24 -0600 Subject: [PATCH 11/24] refactor(lib/events): newGzipWriter takes io.Writer Previously, newGzipWriter took an io.WriteCloser, but the only time newGzipWriter was being used was with a Closer that did nothing. So just take an io.Writer instead. This also enables removing the bufferCloser struct. --- lib/events/sessionlog.go | 13 +++++-------- lib/events/stream.go | 10 +--------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/lib/events/sessionlog.go b/lib/events/sessionlog.go index 78f5e6b81c798..04f49df3703f9 100644 --- a/lib/events/sessionlog.go +++ b/lib/events/sessionlog.go @@ -26,13 +26,13 @@ import ( "github.com/gravitational/trace" ) -// gzipWriter wraps file, on close close both gzip writer and file +// gzipWriter wraps io.Writer and gzip.Writer, on close closes the gzip writer type gzipWriter struct { *gzip.Writer - inner io.WriteCloser + inner io.Writer } -// Close closes gzip writer and file +// Close closes gzip writer func (f *gzipWriter) Close() error { var errors []error if f.Writer != nil { @@ -41,10 +41,7 @@ func (f *gzipWriter) Close() error { writerPool.Put(f.Writer) f.Writer = nil } - if f.inner != nil { - errors = append(errors, f.inner.Close()) - f.inner = nil - } + f.inner = nil return trace.NewAggregate(errors...) } @@ -59,7 +56,7 @@ var writerPool = sync.Pool{ }, } -func newGzipWriter(writer io.WriteCloser) *gzipWriter { +func newGzipWriter(writer io.Writer) *gzipWriter { g := writerPool.Get().(*gzip.Writer) g.Reset(writer) return &gzipWriter{ diff --git a/lib/events/stream.go b/lib/events/stream.go index e3bf551802034..00f6530456892 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -614,14 +614,6 @@ func (w *sliceWriter) startUploadCurrentSlice() error { return nil } -type bufferCloser struct { - *bytes.Buffer -} - -func (b *bufferCloser) Close() error { - return nil -} - func (w *sliceWriter) newSlice() (*slice, error) { w.lastPartNumber++ // This buffer will be returned to the pool by slice.Close @@ -640,7 +632,7 @@ func (w *sliceWriter) newSlice() (*slice, error) { return &slice{ proto: w.proto, buffer: buffer, - writer: newGzipWriter(&bufferCloser{Buffer: buffer}), + writer: newGzipWriter(buffer), }, nil } From ac513e878d68717e4346b2a201448adfa976ca45 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 10:09:23 -0600 Subject: [PATCH 12/24] refactor(api/sessionrecording): use better names for Reader fields --- api/sessionrecording/session_recording.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 59e9411be4541..a8e296286f83f 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -58,7 +58,7 @@ const ( // with the [Reader]. func NewReader(r io.Reader) *Reader { return &Reader{ - reader: r, + rawReader: r, lastIndex: -1, } } @@ -78,10 +78,10 @@ const ( // Reader reads Teleport's session recordings type Reader struct { - inner io.Reader + partReader io.Reader gzipReader *gzip.Reader padding int64 - reader io.Reader + rawReader io.Reader state int error error lastIndex int64 @@ -137,7 +137,7 @@ func (r *Reader) Reset(reader io.Reader) error { } r.gzipReader = nil } - r.reader = reader + r.rawReader = reader r.state = protoReaderStateInit return nil } @@ -182,7 +182,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { case protoReaderStateInit: // read the part header that consists of the protocol version // and the part size (for the V1 version of the protocol) - _, err := io.ReadFull(r.reader, sizeBytes[:Int64Size]) + _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]) if err != nil { // reached the end of the stream if errors.Is(err, io.EOF) { @@ -196,19 +196,19 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, trace.BadParameter("unsupported protocol version %v", protocolVersion) } // read size of this gzipped part as encoded by V1 protocol version - _, err = io.ReadFull(r.reader, sizeBytes[:Int64Size]) + _, err = io.ReadFull(r.rawReader, sizeBytes[:Int64Size]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } partSize := binary.BigEndian.Uint64(sizeBytes[:Int64Size]) // read padding size (could be 0) - _, err = io.ReadFull(r.reader, sizeBytes[:Int64Size]) + _, err = io.ReadFull(r.rawReader, sizeBytes[:Int64Size]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } r.padding = int64(binary.BigEndian.Uint64(sizeBytes[:Int64Size])) - r.inner = io.LimitReader(r.reader, int64(partSize)) - gzipReader, err := gzip.NewReader(r.inner) + r.partReader = io.LimitReader(r.rawReader, int64(partSize)) + gzipReader, err := gzip.NewReader(r.partReader) // older bugged versions of teleport would sometimes incorrectly inject padding bytes into // the gzip section of the archive. this causes gzip readers with multistream enabled (the // default behavior) to fail. we disable multistream here in order to ensure that the gzip @@ -235,7 +235,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { // due to a bug in older versions of teleport it was possible that padding // bytes would end up inside of the gzip section of the archive. we should // skip any dangling data in the gzip secion. - n, err := io.CopyBuffer(io.Discard, r.inner, messageBytes[:]) + n, err := io.CopyBuffer(io.Discard, r.partReader, messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -252,7 +252,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, r.setError(trace.ConvertSystemError(err)) } if r.padding != 0 { - skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.reader, r.padding), messageBytes[:]) + skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.rawReader, r.padding), messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } From 9b9688863ac8b25e7ded111edc2ce4a59c06e118 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 10:13:52 -0600 Subject: [PATCH 13/24] docs(api/sessionrecording): add godoc for Reader fields This is mostly to help clarify what the different readers do. --- api/sessionrecording/session_recording.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index a8e296286f83f..9b08cc6e43fb5 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -78,14 +78,24 @@ const ( // Reader reads Teleport's session recordings type Reader struct { + // partReader wraps rawReader and is limited to reading a single + // (compressed) part from the session recording partReader io.Reader + // gzipReader wraps partReader and decompresses a single part + // from the session recording gzipReader *gzip.Reader - padding int64 - rawReader io.Reader - state int - error error - lastIndex int64 - stats ReaderStats + // padding is how many bytes were added to hit a minimum file upload size + padding int64 + // rawReader is the raw data source we read from + rawReader io.Reader + // state tracks where the Reader is at in consuming a session recording + state int + // error holds any error encountered while reading a session recording + error error + // lastIndex stores the last parsed event's index within a session (events found with an index less than or equal to lastIndex are skipped) + lastIndex int64 + // stats contains info about processed events (e.g. total events processed, how many events were skipped) + stats ReaderStats } // ReaderStats contains some reader statistics From 36ea1431880fa14574847a10b7f39399da39518a Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 12:00:53 -0600 Subject: [PATCH 14/24] refactor(api/sessionrecording): remove Reader.Reset Reader.Reset was only being used by tests. Remove Reset from the API, so it does not need to be supported for consumers. Update tests to not require Reset usage. --- api/sessionrecording/session_recording.go | 18 --------- lib/events/filesessions/fileasync_test.go | 46 ++++++++++++++++++----- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 9b08cc6e43fb5..d970ca9740335 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -134,24 +134,6 @@ func (r *Reader) Close() error { return nil } -// Reset sets reader to read from the new reader -// without resetting the stats, could be used -// to deduplicate the events -func (r *Reader) Reset(reader io.Reader) error { - if r.error != nil { - return r.error - } - if r.gzipReader != nil { - if r.error = r.gzipReader.Close(); r.error != nil { - return trace.Wrap(r.error) - } - r.gzipReader = nil - } - r.rawReader = reader - r.state = protoReaderStateInit - return nil -} - func (r *Reader) setError(err error) error { r.state = protoReaderStateError r.error = err diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index fa7a23f6b9680..4006c547fc866 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -25,6 +25,7 @@ import ( "errors" "os" "path/filepath" + "slices" "sync/atomic" "testing" "time" @@ -660,6 +661,7 @@ func emitStream(ctx context.Context, t *testing.T, streamer events.Streamer, inE } // readStream reads and decodes the audit stream from uploadID +// Additionally, readStream will remove any duplicate audit events (by index) encountered. func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *eventstest.MemoryUploader) []apievents.AuditEvent { t.Helper() @@ -667,18 +669,44 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev require.NoError(t, err) var outEvents []apievents.AuditEvent - var reader *sessionrecording.Reader - for i, part := range parts { - if i == 0 { - reader = sessionrecording.NewReader(bytes.NewReader(part)) - } else { - err := reader.Reset(bytes.NewReader(part)) - require.NoError(t, err) - } + for _, part := range parts { + reader := sessionrecording.NewReader(bytes.NewReader(part)) + out, err := reader.ReadAll(ctx) require.NoError(t, err, "part crash %#v", part) + require.NoError(t, reader.Close(), "error closing session recording reader") + outEvents = append(outEvents, out...) } - return outEvents + + // sort audit events by index + slices.SortStableFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) int { + return int(a.GetIndex() - b.GetIndex()) + }) + + // remove any audit events with duplicate indexes + return uniqueAuditEvents(outEvents) +} + +// uniqueAuditEvents assumes auditEvents are sorted by index +// +// returned audit events are guaranteed to have unique indexes by removing +// any audit events containing a previously seen index +func uniqueAuditEvents(auditEvents []apievents.AuditEvent) []apievents.AuditEvent { + var uniqAuditEvents []apievents.AuditEvent + + for i, auditEvent := range auditEvents { + // always add first audit event + if i == 0 { + uniqAuditEvents = append(uniqAuditEvents, auditEvent) + continue + } + + if auditEvent.GetIndex() != auditEvents[i-1].GetIndex() { + uniqAuditEvents = append(uniqAuditEvents, auditEvent) + } + } + + return uniqAuditEvents } From cb2371e254157045cb8e565840608fe6d1e87d03 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 16:35:42 -0600 Subject: [PATCH 15/24] refactor(lib/events/filesessions): use slices.CompactFunc --- lib/events/filesessions/fileasync_test.go | 26 +++-------------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 4006c547fc866..06e3b74965789 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -686,27 +686,7 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev }) // remove any audit events with duplicate indexes - return uniqueAuditEvents(outEvents) -} - -// uniqueAuditEvents assumes auditEvents are sorted by index -// -// returned audit events are guaranteed to have unique indexes by removing -// any audit events containing a previously seen index -func uniqueAuditEvents(auditEvents []apievents.AuditEvent) []apievents.AuditEvent { - var uniqAuditEvents []apievents.AuditEvent - - for i, auditEvent := range auditEvents { - // always add first audit event - if i == 0 { - uniqAuditEvents = append(uniqAuditEvents, auditEvent) - continue - } - - if auditEvent.GetIndex() != auditEvents[i-1].GetIndex() { - uniqAuditEvents = append(uniqAuditEvents, auditEvent) - } - } - - return uniqAuditEvents + return slices.CompactFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) bool { + return a.GetIndex() == b.GetIndex() + }) } From a61e11776e75fdb79d25b61c949b6cec4bd694c6 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 16:41:26 -0600 Subject: [PATCH 16/24] fix(lib/events/filesessions): replace int64 conversion with comparison This prevents any unsafe int64 to int conversions. --- lib/events/filesessions/fileasync_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 06e3b74965789..2815e5bf34500 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -682,7 +682,11 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev // sort audit events by index slices.SortStableFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) int { - return int(a.GetIndex() - b.GetIndex()) + if a.GetIndex() < b.GetIndex() { + return -1 + } + + return 1 }) // remove any audit events with duplicate indexes From 11d6eabc6ec0499e7d320e443a4962db3c95bfa6 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Thu, 2 Jan 2025 16:54:28 -0600 Subject: [PATCH 17/24] refactor(lib/events/filesessions): use SortFunc instead of SortStableFunc --- lib/events/filesessions/fileasync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 2815e5bf34500..c9ad0d20e4a65 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -681,7 +681,7 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev } // sort audit events by index - slices.SortStableFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) int { + slices.SortFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) int { if a.GetIndex() < b.GetIndex() { return -1 } From e5f508d2b8c4e1eff6dc503a33d9bdb363f4a821 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 09:16:44 -0600 Subject: [PATCH 18/24] test(lib/events): combine uploaded parts to be read by single Reader Before, a session recording reader was being created for each uploaded part. There are situations where duplicate event indices may be uploaded. This was causing the test cases to have to de-duplicate events. Now, combine all uploaded parts into a single byte slice (bytes.Buffer) to be read by a single session recording reader. The reader already knows to drop duplicate event indices. --- lib/events/filesessions/fileasync_test.go | 36 ++++++++++------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index c9ad0d20e4a65..b692ec55e8681 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -25,7 +25,6 @@ import ( "errors" "os" "path/filepath" - "slices" "sync/atomic" "testing" "time" @@ -668,29 +667,24 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev parts, err := uploader.GetParts(uploadID) require.NoError(t, err) - var outEvents []apievents.AuditEvent + // combine all uploaded parts to create the session recording content + var sessionRecordingContent bytes.Buffer for _, part := range parts { - reader := sessionrecording.NewReader(bytes.NewReader(part)) - - out, err := reader.ReadAll(ctx) - require.NoError(t, err, "part crash %#v", part) - - require.NoError(t, reader.Close(), "error closing session recording reader") - - outEvents = append(outEvents, out...) + bytesWritten, err := sessionRecordingContent.Write(part) + require.NoError(t, err, "error writing part bytes to session recording content") + require.Equal(t, len(part), bytesWritten, "not all bytes were written to session recording content") } - // sort audit events by index - slices.SortFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) int { - if a.GetIndex() < b.GetIndex() { - return -1 - } + // Note: it is possible for duplicate event indices to be encountered in cases where the upload process + // encounters an error such as the connection being termianted, since the upload process will retry uploading + // those events for a successful upload. This is not an issue because session recording reader knows to drop + // events found with an event index already read. + reader := sessionrecording.NewReader(&sessionRecordingContent) - return 1 - }) + outEvents, err := reader.ReadAll(ctx) + require.NoError(t, err, "error reading all session recording content") - // remove any audit events with duplicate indexes - return slices.CompactFunc(outEvents, func(a apievents.AuditEvent, b apievents.AuditEvent) bool { - return a.GetIndex() == b.GetIndex() - }) + require.NoError(t, reader.Close(), "error closing session recording reader") + + return outEvents } From 41b208507a296963dc3b7010b62116492f3006b8 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 11:30:56 -0600 Subject: [PATCH 19/24] fix(api/sessionrecording): use underscores in slog key names --- api/sessionrecording/session_recording.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index d970ca9740335..cbcdb6ae66953 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -119,10 +119,10 @@ type ReaderStats struct { // LogValue returns a copy of the stats to be used as log fields func (p ReaderStats) LogValue() slog.Value { return slog.GroupValue( - slog.Int64("skipped-bytes", p.SkippedBytes), - slog.Int64("skipped-events", p.SkippedEvents), - slog.Int64("out-of-order-events", p.OutOfOrderEvents), - slog.Int64("total-events", p.TotalEvents), + slog.Int64("skipped_bytes", p.SkippedBytes), + slog.Int64("skipped_events", p.SkippedEvents), + slog.Int64("out_of_order_events", p.OutOfOrderEvents), + slog.Int64("total_events", p.TotalEvents), ) } From 01ce51f8f8d1df2361ab1351d0260ebb497cd6e6 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 11:33:28 -0600 Subject: [PATCH 20/24] refactor(api/sessionrecording): scope checkpointIteration to for loop --- api/sessionrecording/session_recording.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index cbcdb6ae66953..d9d84eb048ff0 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -153,9 +153,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { // is an extra precaution to avoid // accidental endless loop due to logic error crashing the system // and allows ctx timeout to kick in if specified - var checkpointIteration int64 - for { - checkpointIteration++ + for checkpointIteration := int64(1); ; checkpointIteration++ { if checkpointIteration%maxIterationLimit == 0 { select { case <-ctx.Done(): From f3a81f1cd4e8329b7a35f6d8c111a266a710cdd0 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 11:38:01 -0600 Subject: [PATCH 21/24] refactor(api/sessionrecording): reduce scope of err vars --- api/sessionrecording/session_recording.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index d9d84eb048ff0..4adb94c410bbf 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -172,8 +172,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { case protoReaderStateInit: // read the part header that consists of the protocol version // and the part size (for the V1 version of the protocol) - _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]) - if err != nil { + if _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]); err != nil { // reached the end of the stream if errors.Is(err, io.EOF) { r.state = protoReaderStateEOF @@ -186,14 +185,12 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, trace.BadParameter("unsupported protocol version %v", protocolVersion) } // read size of this gzipped part as encoded by V1 protocol version - _, err = io.ReadFull(r.rawReader, sizeBytes[:Int64Size]) - if err != nil { + if _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } partSize := binary.BigEndian.Uint64(sizeBytes[:Int64Size]) // read padding size (could be 0) - _, err = io.ReadFull(r.rawReader, sizeBytes[:Int64Size]) - if err != nil { + if _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } r.padding = int64(binary.BigEndian.Uint64(sizeBytes[:Int64Size])) @@ -216,8 +213,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { // the record consists of length of the protobuf encoded // message and the message itself - _, err := io.ReadFull(r.gzipReader, sizeBytes[:Int32Size]) - if err != nil { + if _, err := io.ReadFull(r.gzipReader, sizeBytes[:Int32Size]); err != nil { if !errors.Is(err, io.EOF) { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -264,13 +260,11 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { if messageSize == 0 { return nil, r.setError(trace.BadParameter("unexpected message size 0")) } - _, err = io.ReadFull(r.gzipReader, messageBytes[:messageSize]) - if err != nil { + if _, err := io.ReadFull(r.gzipReader, messageBytes[:messageSize]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } oneof := apievents.OneOf{} - err = oneof.Unmarshal(messageBytes[:messageSize]) - if err != nil { + if err := oneof.Unmarshal(messageBytes[:messageSize]); err != nil { return nil, trace.Wrap(err) } event, err := apievents.FromOneOf(oneof) From 09b08846147eb83342ba2f5d613afbb9f693c22b Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 11:42:13 -0600 Subject: [PATCH 22/24] refactor(api/sessionrecording): avoid creating empty struct --- api/sessionrecording/session_recording.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index 4adb94c410bbf..ed72ec36a5c5b 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -263,7 +263,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { if _, err := io.ReadFull(r.gzipReader, messageBytes[:messageSize]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } - oneof := apievents.OneOf{} + var oneof apievents.OneOf if err := oneof.Unmarshal(messageBytes[:messageSize]); err != nil { return nil, trace.Wrap(err) } From 4ca0689db34b7c5e20da7f55b027b33e920beac4 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 12:12:36 -0600 Subject: [PATCH 23/24] refactor(api/sessionrecording): move size and message bytes to Reader struct This will prevent Go from needing to allocate and zero out 64K for each invocation of Read. This may possibly reduce the chances of Go doing a heap allocation. --- api/sessionrecording/session_recording.go | 32 +++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/api/sessionrecording/session_recording.go b/api/sessionrecording/session_recording.go index ed72ec36a5c5b..6d0039cf1d01a 100644 --- a/api/sessionrecording/session_recording.go +++ b/api/sessionrecording/session_recording.go @@ -88,6 +88,10 @@ type Reader struct { padding int64 // rawReader is the raw data source we read from rawReader io.Reader + // sizeBytes is used to hold the header of the current event being parsed + sizeBytes [Int64Size]byte + // messageBytes holds the current decompressed event being parsed + messageBytes [MaxProtoMessageSizeBytes]byte // state tracks where the Reader is at in consuming a session recording state int // error holds any error encountered while reading a session recording @@ -147,8 +151,6 @@ func (r *Reader) GetStats() ReaderStats { // Read returns next event or io.EOF in case of the end of the parts func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { - var sizeBytes [Int64Size]byte - // periodic checks of context after fixed amount of iterations // is an extra precaution to avoid // accidental endless loop due to logic error crashing the system @@ -172,7 +174,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { case protoReaderStateInit: // read the part header that consists of the protocol version // and the part size (for the V1 version of the protocol) - if _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]); err != nil { + if _, err := io.ReadFull(r.rawReader, r.sizeBytes[:Int64Size]); err != nil { // reached the end of the stream if errors.Is(err, io.EOF) { r.state = protoReaderStateEOF @@ -180,20 +182,20 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { } return nil, r.setError(trace.ConvertSystemError(err)) } - protocolVersion := binary.BigEndian.Uint64(sizeBytes[:Int64Size]) + protocolVersion := binary.BigEndian.Uint64(r.sizeBytes[:Int64Size]) if protocolVersion != ProtoStreamV1 { return nil, trace.BadParameter("unsupported protocol version %v", protocolVersion) } // read size of this gzipped part as encoded by V1 protocol version - if _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]); err != nil { + if _, err := io.ReadFull(r.rawReader, r.sizeBytes[:Int64Size]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } - partSize := binary.BigEndian.Uint64(sizeBytes[:Int64Size]) + partSize := binary.BigEndian.Uint64(r.sizeBytes[:Int64Size]) // read padding size (could be 0) - if _, err := io.ReadFull(r.rawReader, sizeBytes[:Int64Size]); err != nil { + if _, err := io.ReadFull(r.rawReader, r.sizeBytes[:Int64Size]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } - r.padding = int64(binary.BigEndian.Uint64(sizeBytes[:Int64Size])) + r.padding = int64(binary.BigEndian.Uint64(r.sizeBytes[:Int64Size])) r.partReader = io.LimitReader(r.rawReader, int64(partSize)) gzipReader, err := gzip.NewReader(r.partReader) // older bugged versions of teleport would sometimes incorrectly inject padding bytes into @@ -209,11 +211,9 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { continue // read the next version from the gzip reader case protoReaderStateCurrent: - var messageBytes [MaxProtoMessageSizeBytes]byte - // the record consists of length of the protobuf encoded // message and the message itself - if _, err := io.ReadFull(r.gzipReader, sizeBytes[:Int32Size]); err != nil { + if _, err := io.ReadFull(r.gzipReader, r.sizeBytes[:Int32Size]); err != nil { if !errors.Is(err, io.EOF) { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -221,7 +221,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { // due to a bug in older versions of teleport it was possible that padding // bytes would end up inside of the gzip section of the archive. we should // skip any dangling data in the gzip secion. - n, err := io.CopyBuffer(io.Discard, r.partReader, messageBytes[:]) + n, err := io.CopyBuffer(io.Discard, r.partReader, r.messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -238,7 +238,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { return nil, r.setError(trace.ConvertSystemError(err)) } if r.padding != 0 { - skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.rawReader, r.padding), messageBytes[:]) + skipped, err := io.CopyBuffer(io.Discard, io.LimitReader(r.rawReader, r.padding), r.messageBytes[:]) if err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } @@ -252,7 +252,7 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { r.state = protoReaderStateInit continue } - messageSize := binary.BigEndian.Uint32(sizeBytes[:Int32Size]) + messageSize := binary.BigEndian.Uint32(r.sizeBytes[:Int32Size]) // zero message size indicates end of the part // that sometimes is present in partially submitted parts // that have to be filled with zeroes for parts smaller @@ -260,11 +260,11 @@ func (r *Reader) Read(ctx context.Context) (apievents.AuditEvent, error) { if messageSize == 0 { return nil, r.setError(trace.BadParameter("unexpected message size 0")) } - if _, err := io.ReadFull(r.gzipReader, messageBytes[:messageSize]); err != nil { + if _, err := io.ReadFull(r.gzipReader, r.messageBytes[:messageSize]); err != nil { return nil, r.setError(trace.ConvertSystemError(err)) } var oneof apievents.OneOf - if err := oneof.Unmarshal(messageBytes[:messageSize]); err != nil { + if err := oneof.Unmarshal(r.messageBytes[:messageSize]); err != nil { return nil, trace.Wrap(err) } event, err := apievents.FromOneOf(oneof) From e93d2c7369ca8edbc4339333c31968c6b3fe9464 Mon Sep 17 00:00:00 2001 From: Dustin Specker Date: Wed, 8 Jan 2025 13:48:33 -0600 Subject: [PATCH 24/24] docs(lib/events): remove untrue comment This was true, but is no longer true since sessionrecording.Reader does this behavior by default. --- lib/events/filesessions/fileasync_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index b692ec55e8681..22afcd16aa718 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -660,7 +660,6 @@ func emitStream(ctx context.Context, t *testing.T, streamer events.Streamer, inE } // readStream reads and decodes the audit stream from uploadID -// Additionally, readStream will remove any duplicate audit events (by index) encountered. func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *eventstest.MemoryUploader) []apievents.AuditEvent { t.Helper()