From 062ae0bf37f82ba8254a8fee9cfb2ab2f26f1ffe Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Thu, 31 Oct 2024 22:11:40 +0800 Subject: [PATCH 01/15] feat: add track manager Signed-off-by: Shiwei Zhang --- internal/experimental/track/interface.go | 47 ++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 internal/experimental/track/interface.go diff --git a/internal/experimental/track/interface.go b/internal/experimental/track/interface.go new file mode 100644 index 000000000..d80bfda5e --- /dev/null +++ b/internal/experimental/track/interface.go @@ -0,0 +1,47 @@ +package track + +import ( + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// State represents the state of a descriptor. +type State int + +const ( + StateUnknown State = iota + StateStarted + StateStopped + StateExists + StateSkipped + StateMounted +) + +// Status represents the status of a descriptor. +type Status struct { + State State + Offset int64 +} + +// Tracker updates the status of a descriptor. +type Tracker interface { + io.Closer + + // Update updates the status of the descriptor. + Update(status Status) error + + // Fail marks the descriptor as failed. + Fail(err error) error +} + +// Manager tracks the progress of multiple descriptors. +type Manager interface { + io.Closer + + // Record records the progress of a descriptor. + Record(desc ocispec.Descriptor, status Status) error + + // Track starts tracking the progress of a descriptor. + Track(desc ocispec.Descriptor) (Tracker, error) +} From 48faae89c46ced0342099aefedd34aa295fae348 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Thu, 31 Oct 2024 23:37:42 +0800 Subject: [PATCH 02/15] feat: use new track Signed-off-by: Shiwei Zhang --- .../display/status/progress/manager.go | 36 ++++-------- .../display/status/progress/messenger.go | 51 +++++++++++----- .../internal/display/status/track/reader.go | 58 +++++++++++-------- .../internal/display/status/track/target.go | 30 +++++----- cmd/oras/internal/display/status/tty.go | 32 +++++++--- cmd/oras/root/cp.go | 45 ++++++++++++++ internal/experimental/track/interface.go | 27 +++++++-- 7 files changed, 187 insertions(+), 92 deletions(-) diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index 28e61d5e0..db5981f03 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -23,6 +23,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/console" + "oras.land/oras/internal/experimental/track" ) const ( @@ -34,13 +35,6 @@ const ( var errManagerStopped = errors.New("progress output manager has already been stopped") -// Manager is progress view master -type Manager interface { - Add() (*Messenger, error) - SendAndStop(desc ocispec.Descriptor, prompt string) error - Close() error -} - type manager struct { status []*status statusLock sync.RWMutex @@ -48,10 +42,11 @@ type manager struct { updating sync.WaitGroup renderDone chan struct{} renderClosed chan struct{} + prompt map[track.State]string } // NewManager initialized a new progress manager. -func NewManager(tty *os.File) (Manager, error) { +func NewManager(tty *os.File, prompt map[track.State]string) (track.Manager, error) { c, err := console.NewConsole(tty) if err != nil { return nil, err @@ -103,8 +98,8 @@ func (m *manager) render() { } } -// Add appends a new status with 2-line space for rendering. -func (m *manager) Add() (*Messenger, error) { +// Track appends a new status with 2-line space for rendering. +func (m *manager) Track(desc ocispec.Descriptor) (track.Tracker, error) { if m.closed() { return nil, errManagerStopped } @@ -116,21 +111,10 @@ func (m *manager) Add() (*Messenger, error) { defer m.console.NewRow() defer m.console.NewRow() - return m.statusChan(s), nil -} - -// SendAndStop send message for descriptor and stop timing. -func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error { - messenger, err := m.Add() - if err != nil { - return err - } - messenger.Send(prompt, desc, desc.Size) - messenger.Stop() - return nil + return m.statusChan(s, desc), nil } -func (m *manager) statusChan(s *status) *Messenger { +func (m *manager) statusChan(s *status, desc ocispec.Descriptor) track.Tracker { ch := make(chan *status, BufferSize) m.updating.Add(1) go func() { @@ -139,7 +123,11 @@ func (m *manager) statusChan(s *status) *Messenger { s.update(newStatus) } }() - return &Messenger{ch: ch} + return &Messenger{ + ch: ch, + desc: desc, + prompt: m.prompt, + } } // Close stops all status and waits for updating and rendering. diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go index 9f0188b5a..37da92314 100644 --- a/cmd/oras/internal/display/status/progress/messenger.go +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -16,32 +16,53 @@ limitations under the License. package progress import ( + "time" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/progress/humanize" - "time" + "oras.land/oras/internal/experimental/track" ) // Messenger is progress message channel. type Messenger struct { ch chan *status closed bool + desc ocispec.Descriptor + prompt map[track.State]string +} + +func (m *Messenger) Update(status track.Status) error { + if status.State == track.StateInitialized { + m.start() + } + m.send(m.prompt[status.State], status.Offset) + return nil +} + +func (m *Messenger) Fail(err error) error { + return err +} + +func (m *Messenger) Close() error { + m.stop() + return nil } -// Start initializes the messenger. -func (sm *Messenger) Start() { - if sm.ch == nil { +// start initializes the messenger. +func (m *Messenger) start() { + if m.ch == nil { return } - sm.ch <- startTiming() + m.ch <- startTiming() } -// Send a status message for the specified descriptor. -func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset int64) { +// send a status message for the specified descriptor. +func (m *Messenger) send(prompt string, offset int64) { for { select { - case sm.ch <- newStatusMessage(prompt, descriptor, offset): + case m.ch <- newStatusMessage(prompt, m.desc, offset): return - case <-sm.ch: + case <-m.ch: // purge the channel until successfully pushed default: // ch is nil @@ -50,14 +71,14 @@ func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset i } } -// Stop the messenger after sending a end message. -func (sm *Messenger) Stop() { - if sm.closed { +// stop the messenger after sending a end message. +func (m *Messenger) stop() { + if m.closed { return } - sm.ch <- endTiming() - close(sm.ch) - sm.closed = true + m.ch <- endTiming() + close(m.ch) + m.closed = true } // newStatus generates a base empty status. diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 93919381f..4901e5ac6 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -21,40 +21,43 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/progress" + "oras.land/oras/internal/experimental/track" ) type reader struct { - base io.Reader - offset int64 - actionPrompt string - donePrompt string - descriptor ocispec.Descriptor - manager progress.Manager - messenger *progress.Messenger + base io.Reader + offset int64 + size int64 + manager track.Manager + messenger track.Tracker } // NewReader returns a new reader with tracked progress. func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, donePrompt string, tty *os.File) (*reader, error) { - manager, err := progress.NewManager(tty) + prompt := map[track.State]string{ + track.StateInitialized: actionPrompt, + track.StateTransmitting: actionPrompt, + track.StateTransmitted: donePrompt, + } + + manager, err := progress.NewManager(tty, prompt) if err != nil { return nil, err } - return managedReader(r, descriptor, manager, actionPrompt, donePrompt) + return managedReader(r, descriptor, manager) } -func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager, actionPrompt string, donePrompt string) (*reader, error) { - messenger, err := manager.Add() +func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager track.Manager) (*reader, error) { + messenger, err := manager.Track(descriptor) if err != nil { return nil, err } return &reader{ - base: r, - descriptor: descriptor, - actionPrompt: actionPrompt, - donePrompt: donePrompt, - manager: manager, - messenger: messenger, + base: r, + size: descriptor.Size, + manager: manager, + messenger: messenger, }, nil } @@ -66,18 +69,24 @@ func (r *reader) StopManager() { // Done sends message to mark the tracked progress as complete. func (r *reader) Done() { - r.messenger.Send(r.donePrompt, r.descriptor, r.descriptor.Size) - r.messenger.Stop() + r.messenger.Update(track.Status{ + State: track.StateTransmitted, + Offset: r.size, + }) + r.messenger.Close() } // Close closes the update channel. func (r *reader) Close() { - r.messenger.Stop() + r.messenger.Close() } // Start sends the start timing to the messenger channel. func (r *reader) Start() { - r.messenger.Start() + r.messenger.Update(track.Status{ + State: track.StateInitialized, + Offset: -1, + }) } // Read reads from the underlying reader and updates the progress. @@ -89,10 +98,13 @@ func (r *reader) Read(p []byte) (int, error) { r.offset = r.offset + int64(n) if err == io.EOF { - if r.offset != r.descriptor.Size { + if r.offset != r.size { return n, io.ErrUnexpectedEOF } } - r.messenger.Send(r.actionPrompt, r.descriptor, r.offset) + r.messenger.Update(track.Status{ + State: track.StateTransmitting, + Offset: r.offset, + }) return n, err } diff --git a/cmd/oras/internal/display/status/track/target.go b/cmd/oras/internal/display/status/track/target.go index dce64201b..7c5421506 100644 --- a/cmd/oras/internal/display/status/track/target.go +++ b/cmd/oras/internal/display/status/track/target.go @@ -26,20 +26,19 @@ import ( "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry" "oras.land/oras/cmd/oras/internal/display/status/progress" + "oras.land/oras/internal/experimental/track" ) // GraphTarget is a tracked oras.GraphTarget. type GraphTarget interface { oras.GraphTarget io.Closer - Prompt(desc ocispec.Descriptor, prompt string) error + Report(desc ocispec.Descriptor, state track.State) error } type graphTarget struct { oras.GraphTarget - manager progress.Manager - actionPrompt string - donePrompt string + manager track.Manager } type referenceGraphTarget struct { @@ -47,16 +46,14 @@ type referenceGraphTarget struct { } // NewTarget creates a new tracked Target. -func NewTarget(t oras.GraphTarget, actionPrompt, donePrompt string, tty *os.File) (GraphTarget, error) { - manager, err := progress.NewManager(tty) +func NewTarget(t oras.GraphTarget, prompt map[track.State]string, tty *os.File) (GraphTarget, error) { + manager, err := progress.NewManager(tty, prompt) if err != nil { return nil, err } gt := &graphTarget{ - GraphTarget: t, - manager: manager, - actionPrompt: actionPrompt, - donePrompt: donePrompt, + GraphTarget: t, + manager: manager, } if _, ok := t.(registry.ReferencePusher); ok { @@ -76,7 +73,7 @@ func (t *graphTarget) Mount(ctx context.Context, desc ocispec.Descriptor, fromRe // Push pushes the content to the base oras.GraphTarget with tracking. func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, content io.Reader) error { - r, err := managedReader(content, expected, t.manager, t.actionPrompt, t.donePrompt) + r, err := managedReader(content, expected, t.manager) if err != nil { return err } @@ -95,7 +92,7 @@ func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, con // PushReference pushes the content to the base oras.GraphTarget with tracking. func (rgt *referenceGraphTarget) PushReference(ctx context.Context, expected ocispec.Descriptor, content io.Reader, reference string) error { - r, err := managedReader(content, expected, rgt.manager, rgt.actionPrompt, rgt.donePrompt) + r, err := managedReader(content, expected, rgt.manager) if err != nil { return err } @@ -114,7 +111,10 @@ func (t *graphTarget) Close() error { return t.manager.Close() } -// Prompt prompts the user with the provided prompt and descriptor. -func (t *graphTarget) Prompt(desc ocispec.Descriptor, prompt string) error { - return t.manager.SendAndStop(desc, prompt) +// Report prompts the user with the provided state and descriptor. +func (t *graphTarget) Report(desc ocispec.Descriptor, state track.State) error { + return track.Record(t.manager, desc, track.Status{ + State: state, + Offset: desc.Size, + }) } diff --git a/cmd/oras/internal/display/status/tty.go b/cmd/oras/internal/display/status/tty.go index 369a33e04..304a4faae 100644 --- a/cmd/oras/internal/display/status/tty.go +++ b/cmd/oras/internal/display/status/tty.go @@ -25,13 +25,14 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2" "oras.land/oras-go/v2/content" - "oras.land/oras/cmd/oras/internal/display/status/track" + strack "oras.land/oras/cmd/oras/internal/display/status/track" + "oras.land/oras/internal/experimental/track" ) // TTYPushHandler handles TTY status output for push command. type TTYPushHandler struct { tty *os.File - tracked track.GraphTarget + tracked strack.GraphTarget committed *sync.Map fetcher content.Fetcher } @@ -57,7 +58,13 @@ func (ph *TTYPushHandler) OnEmptyArtifact() error { // TrackTarget returns a tracked target. func (ph *TTYPushHandler) TrackTarget(gt oras.GraphTarget) (oras.GraphTarget, StopTrackTargetFunc, error) { - tracked, err := track.NewTarget(gt, PushPromptUploading, PushPromptUploaded, ph.tty) + prompt := map[track.State]string{ + track.StateInitialized: PushPromptUploading, + track.StateTransmitting: PushPromptUploading, + track.StateTransmitted: PushPromptUploaded, + track.StateExists: PushPromptExists, + } + tracked, err := strack.NewTarget(gt, prompt, ph.tty) if err != nil { return nil, nil, err } @@ -68,7 +75,7 @@ func (ph *TTYPushHandler) TrackTarget(gt oras.GraphTarget) (oras.GraphTarget, St // OnCopySkipped is called when an object already exists. func (ph *TTYPushHandler) OnCopySkipped(_ context.Context, desc ocispec.Descriptor) error { ph.committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - return ph.tracked.Prompt(desc, PushPromptExists) + return ph.tracked.Report(desc, track.StateExists) } // PreCopy implements PreCopy of CopyHandler. @@ -84,7 +91,7 @@ func (ph *TTYPushHandler) PostCopy(ctx context.Context, desc ocispec.Descriptor) return err } for _, successor := range successors { - if err = ph.tracked.Prompt(successor, PushPromptSkipped); err != nil { + if err = ph.tracked.Report(successor, track.StateSkipped); err != nil { return err } } @@ -99,7 +106,7 @@ func NewTTYAttachHandler(tty *os.File, fetcher content.Fetcher) AttachHandler { // TTYPullHandler handles TTY status output for pull events. type TTYPullHandler struct { tty *os.File - tracked track.GraphTarget + tracked strack.GraphTarget } // NewTTYPullHandler returns a new handler for Pull status events. @@ -126,17 +133,24 @@ func (ph *TTYPullHandler) OnNodeProcessing(_ ocispec.Descriptor) error { // OnNodeRestored implements PullHandler. func (ph *TTYPullHandler) OnNodeRestored(desc ocispec.Descriptor) error { - return ph.tracked.Prompt(desc, PullPromptRestored) + return ph.tracked.Report(desc, track.StateMounted) } // OnNodeSkipped implements PullHandler. func (ph *TTYPullHandler) OnNodeSkipped(desc ocispec.Descriptor) error { - return ph.tracked.Prompt(desc, PullPromptSkipped) + return ph.tracked.Report(desc, track.StateSkipped) } // TrackTarget returns a tracked target. func (ph *TTYPullHandler) TrackTarget(gt oras.GraphTarget) (oras.GraphTarget, StopTrackTargetFunc, error) { - tracked, err := track.NewTarget(gt, PullPromptDownloading, PullPromptPulled, ph.tty) + prompt := map[track.State]string{ + track.StateInitialized: PullPromptDownloading, + track.StateTransmitting: PullPromptDownloading, + track.StateTransmitted: PullPromptPulled, + track.StateSkipped: PullPromptSkipped, + track.StateMounted: PullPromptRestored, + } + tracked, err := strack.NewTarget(gt, prompt, ph.tty) if err != nil { return nil, nil, err } diff --git a/cmd/oras/root/cp.go b/cmd/oras/root/cp.go index c6693edcb..2a5cc5cd4 100644 --- a/cmd/oras/root/cp.go +++ b/cmd/oras/root/cp.go @@ -34,9 +34,11 @@ import ( "oras.land/oras/cmd/oras/internal/command" "oras.land/oras/cmd/oras/internal/display" "oras.land/oras/cmd/oras/internal/display/status" + strack "oras.land/oras/cmd/oras/internal/display/status/track" oerrors "oras.land/oras/cmd/oras/internal/errors" "oras.land/oras/cmd/oras/internal/option" "oras.land/oras/internal/docker" + "oras.land/oras/internal/experimental/track" "oras.land/oras/internal/graph" "oras.land/oras/internal/listener" "oras.land/oras/internal/registryutil" @@ -175,6 +177,49 @@ func doCopy(ctx context.Context, copyHandler status.CopyHandler, src oras.ReadOn dst, err = copyHandler.StartTracking(dst) if err != nil { return desc, err + if opts.TTY == nil { + // no TTY output + extendedCopyOptions.OnCopySkipped = copyHandler.OnCopySkipped + extendedCopyOptions.PreCopy = copyHandler.PreCopy + extendedCopyOptions.PostCopy = copyHandler.PostCopy + extendedCopyOptions.OnMounted = copyHandler.OnMounted + } else { + // TTY output + prompt := map[track.State]string{ + track.StateInitialized: promptCopying, + track.StateTransmitting: promptCopying, + track.StateTransmitted: promptCopied, + track.StateExists: promptExists, + track.StateSkipped: promptSkipped, + track.StateMounted: promptMounted, + } + tracked, err := strack.NewTarget(dst, prompt, opts.TTY) + if err != nil { + return ocispec.Descriptor{}, err + } + defer tracked.Close() + dst = tracked + extendedCopyOptions.OnCopySkipped = func(ctx context.Context, desc ocispec.Descriptor) error { + committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) + return tracked.Report(desc, track.StateExists) + } + extendedCopyOptions.PostCopy = func(ctx context.Context, desc ocispec.Descriptor) error { + committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) + successors, err := graph.FilteredSuccessors(ctx, desc, tracked, status.DeduplicatedFilter(committed)) + if err != nil { + return err + } + for _, successor := range successors { + if err = tracked.Report(successor, track.StateSkipped); err != nil { + return err + } + } + return nil + } + extendedCopyOptions.OnMounted = func(ctx context.Context, desc ocispec.Descriptor) error { + committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) + return tracked.Report(desc, track.StateMounted) + } } defer func() { stopErr := copyHandler.StopTracking() diff --git a/internal/experimental/track/interface.go b/internal/experimental/track/interface.go index d80bfda5e..1975102a3 100644 --- a/internal/experimental/track/interface.go +++ b/internal/experimental/track/interface.go @@ -11,8 +11,9 @@ type State int const ( StateUnknown State = iota - StateStarted - StateStopped + StateInitialized + StateTransmitting + StateTransmitted StateExists StateSkipped StateMounted @@ -20,7 +21,11 @@ const ( // Status represents the status of a descriptor. type Status struct { - State State + // State represents the state of the descriptor. + State State + + // Offset represents the current offset of the descriptor. + // Offset is discarded if set to a negative value. Offset int64 } @@ -39,9 +44,19 @@ type Tracker interface { type Manager interface { io.Closer - // Record records the progress of a descriptor. - Record(desc ocispec.Descriptor, status Status) error - // Track starts tracking the progress of a descriptor. Track(desc ocispec.Descriptor) (Tracker, error) } + +// Record records the progress of a descriptor. +func Record(m Manager, desc ocispec.Descriptor, status Status) error { + tracker, err := m.Track(desc) + if err != nil { + return err + } + err = tracker.Update(status) + if err != nil { + return err + } + return tracker.Close() +} From fb7e66286e1ed2ac75224f9bcf59c0068287f183 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Fri, 1 Nov 2024 12:13:28 +0800 Subject: [PATCH 03/15] refactor: new reader Signed-off-by: Shiwei Zhang --- .../internal/display/status/track/reader.go | 58 ++--------------- internal/experimental/track/reader.go | 64 +++++++++++++++++++ 2 files changed, 70 insertions(+), 52 deletions(-) create mode 100644 internal/experimental/track/reader.go diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 4901e5ac6..456cc4dfd 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -25,11 +25,9 @@ import ( ) type reader struct { - base io.Reader - offset int64 - size int64 - manager track.Manager - messenger track.Tracker + *track.ReadTracker + + manager track.Manager } // NewReader returns a new reader with tracked progress. @@ -48,16 +46,14 @@ func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, } func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager track.Manager) (*reader, error) { - messenger, err := manager.Track(descriptor) + tracker, err := track.NewReadTracker(manager, descriptor, r) if err != nil { return nil, err } return &reader{ - base: r, - size: descriptor.Size, - manager: manager, - messenger: messenger, + ReadTracker: tracker, + manager: manager, }, nil } @@ -66,45 +62,3 @@ func (r *reader) StopManager() { r.Close() _ = r.manager.Close() } - -// Done sends message to mark the tracked progress as complete. -func (r *reader) Done() { - r.messenger.Update(track.Status{ - State: track.StateTransmitted, - Offset: r.size, - }) - r.messenger.Close() -} - -// Close closes the update channel. -func (r *reader) Close() { - r.messenger.Close() -} - -// Start sends the start timing to the messenger channel. -func (r *reader) Start() { - r.messenger.Update(track.Status{ - State: track.StateInitialized, - Offset: -1, - }) -} - -// Read reads from the underlying reader and updates the progress. -func (r *reader) Read(p []byte) (int, error) { - n, err := r.base.Read(p) - if err != nil && err != io.EOF { - return n, err - } - - r.offset = r.offset + int64(n) - if err == io.EOF { - if r.offset != r.size { - return n, io.ErrUnexpectedEOF - } - } - r.messenger.Update(track.Status{ - State: track.StateTransmitting, - Offset: r.offset, - }) - return n, err -} diff --git a/internal/experimental/track/reader.go b/internal/experimental/track/reader.go new file mode 100644 index 000000000..a5533f1eb --- /dev/null +++ b/internal/experimental/track/reader.go @@ -0,0 +1,64 @@ +package track + +import ( + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// ReadTracker tracks the transmission based on the read operation. +type ReadTracker struct { + base io.Reader + tracker Tracker + offset int64 +} + +// NewReadTracker returns a new ReadTracker. +func NewReadTracker(manager Manager, descriptor ocispec.Descriptor, r io.Reader) (*ReadTracker, error) { + tracker, err := manager.Track(descriptor) + if err != nil { + return nil, err + } + return &ReadTracker{ + base: r, + tracker: tracker, + }, nil +} + +// Read reads from the base reader and updates the status. +func (rt *ReadTracker) Read(p []byte) (n int, err error) { + n, err = rt.base.Read(p) + rt.offset += int64(n) + _ = rt.tracker.Update(Status{ + State: StateTransmitting, + Offset: rt.offset, + }) + if err != nil && err != io.EOF { + _ = rt.tracker.Fail(err) + } + return n, err +} + +// Close closes the tracker. +func (rt *ReadTracker) Close() error { + return rt.tracker.Close() +} + +// Start starts tracking the transmission. +func (rt *ReadTracker) Start() error { + return rt.tracker.Update(Status{ + State: StateInitialized, + Offset: -1, + }) +} + +// Done marks the transmission as complete. +// Done should be called after the transmission is complete. +// Note: Reading all content from the reader does not imply the transmission is +// complete. +func (rt *ReadTracker) Done() error { + return rt.tracker.Update(Status{ + State: StateTransmitted, + Offset: -1, + }) +} From f88f404f002b2f39b6f07caecc8659926432337e Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Fri, 1 Nov 2024 12:21:05 +0800 Subject: [PATCH 04/15] doc: improve the doc for Record Signed-off-by: Shiwei Zhang --- internal/experimental/track/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/experimental/track/interface.go b/internal/experimental/track/interface.go index 1975102a3..972d8d519 100644 --- a/internal/experimental/track/interface.go +++ b/internal/experimental/track/interface.go @@ -48,7 +48,7 @@ type Manager interface { Track(desc ocispec.Descriptor) (Tracker, error) } -// Record records the progress of a descriptor. +// Record adds the progress of a descriptor as a single entry. func Record(m Manager, desc ocispec.Descriptor, status Status) error { tracker, err := m.Track(desc) if err != nil { From 1a807bfcd6f0a69af4d1136280c2b6bf80ee5fac Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Fri, 1 Nov 2024 12:28:46 +0800 Subject: [PATCH 05/15] refactor: better tracker Signed-off-by: Shiwei Zhang --- cmd/oras/internal/display/status/track/reader.go | 4 ++-- internal/experimental/track/reader.go | 14 ++++---------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 456cc4dfd..0a1e046d9 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -46,13 +46,13 @@ func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, } func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager track.Manager) (*reader, error) { - tracker, err := track.NewReadTracker(manager, descriptor, r) + tracker, err := manager.Track(descriptor) if err != nil { return nil, err } return &reader{ - ReadTracker: tracker, + ReadTracker: track.NewReadTracker(tracker, r), manager: manager, }, nil } diff --git a/internal/experimental/track/reader.go b/internal/experimental/track/reader.go index a5533f1eb..8b0ac862d 100644 --- a/internal/experimental/track/reader.go +++ b/internal/experimental/track/reader.go @@ -2,8 +2,6 @@ package track import ( "io" - - ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // ReadTracker tracks the transmission based on the read operation. @@ -13,16 +11,12 @@ type ReadTracker struct { offset int64 } -// NewReadTracker returns a new ReadTracker. -func NewReadTracker(manager Manager, descriptor ocispec.Descriptor, r io.Reader) (*ReadTracker, error) { - tracker, err := manager.Track(descriptor) - if err != nil { - return nil, err - } +// NewReadTracker attaches a tracker to a reader. +func NewReadTracker(track Tracker, r io.Reader) *ReadTracker { return &ReadTracker{ base: r, - tracker: tracker, - }, nil + tracker: track, + } } // Read reads from the base reader and updates the status. From e3f8185bac735cc791126bd53fd03d1a11132af9 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Fri, 1 Nov 2024 12:44:28 +0800 Subject: [PATCH 06/15] nit: formatting Signed-off-by: Shiwei Zhang --- internal/experimental/track/reader.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/experimental/track/reader.go b/internal/experimental/track/reader.go index 8b0ac862d..d53da0374 100644 --- a/internal/experimental/track/reader.go +++ b/internal/experimental/track/reader.go @@ -1,8 +1,6 @@ package track -import ( - "io" -) +import "io" // ReadTracker tracks the transmission based on the read operation. type ReadTracker struct { From 933b8c6630c7add855ae5faa24817e0f72bb0db3 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Mon, 30 Dec 2024 14:26:33 +0800 Subject: [PATCH 07/15] feat: import oras progress package Signed-off-by: Shiwei Zhang --- internal/progress/example_test.go | 83 +++++++++++++++++ internal/progress/manager.go | 48 ++++++++++ internal/progress/status.go | 40 ++++++++ internal/progress/tracker.go | 150 ++++++++++++++++++++++++++++++ 4 files changed, 321 insertions(+) create mode 100644 internal/progress/example_test.go create mode 100644 internal/progress/manager.go create mode 100644 internal/progress/status.go create mode 100644 internal/progress/tracker.go diff --git a/internal/progress/example_test.go b/internal/progress/example_test.go new file mode 100644 index 000000000..22da22bfd --- /dev/null +++ b/internal/progress/example_test.go @@ -0,0 +1,83 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress_test + +import ( + "crypto/rand" + "fmt" + "io" + + "oras.land/oras/internal/progress" +) + +// ExampleTrackReader demonstrates how to track the transmission progress of a +// reader. +func ExampleTrackReader() { + // Set up a progress tracker. + total := int64(11) + tracker := progress.TrackerFunc(func(status progress.Status, err error) error { + if err != nil { + fmt.Printf("Error: %v\n", err) + return nil + } + switch status.State { + case progress.StateInitialized: + fmt.Println("Start reading content") + case progress.StateTransmitting: + fmt.Printf("Progress: %d/%d bytes\n", status.Offset, total) + case progress.StateTransmitted: + fmt.Println("Finish reading content") + default: + // Ignore other states. + } + return nil + }) + // Close takes no effect for TrackerFunc but should be called for general + // Tracker implementations. + defer tracker.Close() + + // Wrap a reader of a random content generator with the progress tracker. + r := io.LimitReader(rand.Reader, total) + rc := progress.TrackReader(tracker, r) + + // Start tracking the transmission. + if err := progress.Start(tracker); err != nil { + panic(err) + } + + // Read from the random content generator and discard the content, while + // tracking the progress. + // Note: io.Discard is wrapped with a io.MultiWriter for dropping + // the io.ReadFrom interface for demonstration purposes. + buf := make([]byte, 3) + w := io.MultiWriter(io.Discard) + if _, err := io.CopyBuffer(w, rc, buf); err != nil { + panic(err) + } + + // Finish tracking the transmission. + if err := progress.Done(tracker); err != nil { + panic(err) + } + + // Output: + // Start reading content + // Progress: 3/11 bytes + // Progress: 6/11 bytes + // Progress: 9/11 bytes + // Progress: 11/11 bytes + // Finish reading content +} diff --git a/internal/progress/manager.go b/internal/progress/manager.go new file mode 100644 index 000000000..439b90d18 --- /dev/null +++ b/internal/progress/manager.go @@ -0,0 +1,48 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package progress tracks the status of descriptors being processed. +package progress + +import ( + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// Manager tracks the progress of multiple descriptors. +type Manager interface { + io.Closer + + // Track starts tracking the progress of a descriptor. + Track(desc ocispec.Descriptor) (Tracker, error) +} + +// ManagerFunc is an adapter to allow the use of ordinary functions as Managers. +// If f is a function with the appropriate signature, ManagerFunc(f) is a +// [Manager] that calls f. +type ManagerFunc func(desc ocispec.Descriptor, status Status, err error) error + +// Close closes the manager. +func (f ManagerFunc) Close() error { + return nil +} + +// Track starts tracking the progress of a descriptor. +func (f ManagerFunc) Track(desc ocispec.Descriptor) (Tracker, error) { + return TrackerFunc(func(status Status, err error) error { + return f(desc, status, err) + }), nil +} diff --git a/internal/progress/status.go b/internal/progress/status.go new file mode 100644 index 000000000..e6c4d1cb9 --- /dev/null +++ b/internal/progress/status.go @@ -0,0 +1,40 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +// State represents the state of a descriptor. +type State int + +// Registered states. +const ( + StateUnknown State = iota // unknown state + StateInitialized // progress initialized + StateTransmitting // transmitting content + StateTransmitted // content transmitted + StateExists // content exists + StateSkipped // content skipped + StateMounted // content mounted +) + +// Status represents the status of a descriptor. +type Status struct { + // State represents the state of the descriptor. + State State + + // Offset represents the current offset of the descriptor. + // Offset is discarded if set to a negative value. + Offset int64 +} diff --git a/internal/progress/tracker.go b/internal/progress/tracker.go new file mode 100644 index 000000000..58ee8f624 --- /dev/null +++ b/internal/progress/tracker.go @@ -0,0 +1,150 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import "io" + +// Tracker updates the status of a descriptor. +type Tracker interface { + io.Closer + + // Update updates the status of the descriptor. + Update(status Status) error + + // Fail marks the descriptor as failed. + // Fail should return nil on successful failure marking. + Fail(err error) error +} + +// TrackerFunc is an adapter to allow the use of ordinary functions as Trackers. +// If f is a function with the appropriate signature, TrackerFunc(f) is a +// [Tracker] that calls f. +type TrackerFunc func(status Status, err error) error + +// Close closes the tracker. +func (f TrackerFunc) Close() error { + return nil +} + +// Update updates the status of the descriptor. +func (f TrackerFunc) Update(status Status) error { + return f(status, nil) +} + +// Fail marks the descriptor as failed. +func (f TrackerFunc) Fail(err error) error { + return f(Status{}, err) +} + +// Start starts tracking the transmission. +func Start(t Tracker) error { + return t.Update(Status{ + State: StateInitialized, + Offset: -1, + }) +} + +// Done marks the transmission as complete. +// Done should be called after the transmission is complete. +// Note: Reading all content from the reader does not imply the transmission is +// complete. +func Done(t Tracker) error { + return t.Update(Status{ + State: StateTransmitted, + Offset: -1, + }) +} + +// TrackReader bind a reader with a tracker. +func TrackReader(t Tracker, r io.Reader) io.Reader { + rt := readTracker{ + base: r, + tracker: t, + } + if _, ok := r.(io.WriterTo); ok { + return &readTrackerWriteTo{rt} + } + return &rt +} + +// readTracker tracks the transmission based on the read operation. +type readTracker struct { + base io.Reader + tracker Tracker + offset int64 +} + +// Read reads from the base reader and updates the status. +func (rt *readTracker) Read(p []byte) (int, error) { + n, err := rt.base.Read(p) + rt.offset += int64(n) + if n > 0 { + if updateErr := rt.tracker.Update(Status{ + State: StateTransmitting, + Offset: rt.offset, + }); updateErr != nil { + return n, updateErr + } + } + if err != nil && err != io.EOF { + if failErr := rt.tracker.Fail(err); failErr != nil { + return n, failErr + } + } + return n, err +} + +// readTrackerWriteTo is readTracker with WriteTo support. +type readTrackerWriteTo struct { + readTracker +} + +// WriteTo writes to the base writer and updates the status. +func (rt *readTrackerWriteTo) WriteTo(w io.Writer) (int64, error) { + wt := &writeTracker{ + base: w, + tracker: rt.tracker, + offset: rt.offset, + } + n, err := rt.base.(io.WriterTo).WriteTo(wt) + rt.offset = wt.offset + return n, err +} + +// writeTracker tracks the transmission based on the write operation. +type writeTracker struct { + base io.Writer + tracker Tracker + offset int64 +} + +// Write writes to the base writer and updates the status. +func (wt *writeTracker) Write(p []byte) (int, error) { + n, err := wt.base.Write(p) + wt.offset += int64(n) + if n > 0 { + if updateErr := wt.tracker.Update(Status{ + State: StateTransmitting, + Offset: wt.offset, + }); updateErr != nil { + return n, updateErr + } + } + if err != nil { + return n, wt.tracker.Fail(err) + } + return n, nil +} From 04f7c9dd10f9afe03ff780d71695de057e57d7cb Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Mon, 30 Dec 2024 15:20:46 +0800 Subject: [PATCH 08/15] feat!: migrate to the new progress package Signed-off-by: Shiwei Zhang --- .../display/status/progress/manager.go | 10 +-- .../display/status/progress/messenger.go | 10 +-- .../internal/display/status/track/reader.go | 28 +++++---- .../internal/display/status/track/target.go | 43 ++++++++----- cmd/oras/internal/display/status/tty.go | 58 +++++++++-------- cmd/oras/root/blob/fetch.go | 9 ++- cmd/oras/root/blob/push.go | 11 ++-- cmd/oras/root/cp.go | 45 -------------- internal/experimental/track/interface.go | 62 ------------------- internal/experimental/track/reader.go | 56 ----------------- 10 files changed, 98 insertions(+), 234 deletions(-) delete mode 100644 internal/experimental/track/interface.go delete mode 100644 internal/experimental/track/reader.go diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index db5981f03..19a627b4a 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -23,7 +23,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/console" - "oras.land/oras/internal/experimental/track" + "oras.land/oras/internal/progress" ) const ( @@ -42,11 +42,11 @@ type manager struct { updating sync.WaitGroup renderDone chan struct{} renderClosed chan struct{} - prompt map[track.State]string + prompt map[progress.State]string } // NewManager initialized a new progress manager. -func NewManager(tty *os.File, prompt map[track.State]string) (track.Manager, error) { +func NewManager(tty *os.File, prompt map[progress.State]string) (progress.Manager, error) { c, err := console.NewConsole(tty) if err != nil { return nil, err @@ -99,7 +99,7 @@ func (m *manager) render() { } // Track appends a new status with 2-line space for rendering. -func (m *manager) Track(desc ocispec.Descriptor) (track.Tracker, error) { +func (m *manager) Track(desc ocispec.Descriptor) (progress.Tracker, error) { if m.closed() { return nil, errManagerStopped } @@ -114,7 +114,7 @@ func (m *manager) Track(desc ocispec.Descriptor) (track.Tracker, error) { return m.statusChan(s, desc), nil } -func (m *manager) statusChan(s *status, desc ocispec.Descriptor) track.Tracker { +func (m *manager) statusChan(s *status, desc ocispec.Descriptor) progress.Tracker { ch := make(chan *status, BufferSize) m.updating.Add(1) go func() { diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go index 37da92314..8d1e201ec 100644 --- a/cmd/oras/internal/display/status/progress/messenger.go +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -20,7 +20,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/progress/humanize" - "oras.land/oras/internal/experimental/track" + "oras.land/oras/internal/progress" ) // Messenger is progress message channel. @@ -28,11 +28,11 @@ type Messenger struct { ch chan *status closed bool desc ocispec.Descriptor - prompt map[track.State]string + prompt map[progress.State]string } -func (m *Messenger) Update(status track.Status) error { - if status.State == track.StateInitialized { +func (m *Messenger) Update(status progress.Status) error { + if status.State == progress.StateInitialized { m.start() } m.send(m.prompt[status.State], status.Offset) @@ -40,7 +40,7 @@ func (m *Messenger) Update(status track.Status) error { } func (m *Messenger) Fail(err error) error { - return err + return nil } func (m *Messenger) Close() error { diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 0a1e046d9..1f2070a00 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -20,45 +20,47 @@ import ( "os" ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "oras.land/oras/cmd/oras/internal/display/status/progress" - "oras.land/oras/internal/experimental/track" + sprogress "oras.land/oras/cmd/oras/internal/display/status/progress" + "oras.land/oras/internal/progress" ) type reader struct { - *track.ReadTracker + io.Reader + progress.Tracker - manager track.Manager + manager progress.Manager } // NewReader returns a new reader with tracked progress. func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, donePrompt string, tty *os.File) (*reader, error) { - prompt := map[track.State]string{ - track.StateInitialized: actionPrompt, - track.StateTransmitting: actionPrompt, - track.StateTransmitted: donePrompt, + prompt := map[progress.State]string{ + progress.StateInitialized: actionPrompt, + progress.StateTransmitting: actionPrompt, + progress.StateTransmitted: donePrompt, } - manager, err := progress.NewManager(tty, prompt) + manager, err := sprogress.NewManager(tty, prompt) if err != nil { return nil, err } return managedReader(r, descriptor, manager) } -func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager track.Manager) (*reader, error) { +func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager) (*reader, error) { tracker, err := manager.Track(descriptor) if err != nil { return nil, err } return &reader{ - ReadTracker: track.NewReadTracker(tracker, r), - manager: manager, + Reader: progress.TrackReader(tracker, r), + Tracker: tracker, + manager: manager, }, nil } // StopManager stops the messenger channel and related manager. func (r *reader) StopManager() { - r.Close() + _ = r.Tracker.Close() _ = r.manager.Close() } diff --git a/cmd/oras/internal/display/status/track/target.go b/cmd/oras/internal/display/status/track/target.go index 7c5421506..50dd3747d 100644 --- a/cmd/oras/internal/display/status/track/target.go +++ b/cmd/oras/internal/display/status/track/target.go @@ -25,20 +25,20 @@ import ( "oras.land/oras-go/v2" "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry" - "oras.land/oras/cmd/oras/internal/display/status/progress" - "oras.land/oras/internal/experimental/track" + sprogress "oras.land/oras/cmd/oras/internal/display/status/progress" + "oras.land/oras/internal/progress" ) // GraphTarget is a tracked oras.GraphTarget. type GraphTarget interface { oras.GraphTarget io.Closer - Report(desc ocispec.Descriptor, state track.State) error + Report(desc ocispec.Descriptor, state progress.State) error } type graphTarget struct { oras.GraphTarget - manager track.Manager + manager progress.Manager } type referenceGraphTarget struct { @@ -46,8 +46,8 @@ type referenceGraphTarget struct { } // NewTarget creates a new tracked Target. -func NewTarget(t oras.GraphTarget, prompt map[track.State]string, tty *os.File) (GraphTarget, error) { - manager, err := progress.NewManager(tty, prompt) +func NewTarget(t oras.GraphTarget, prompt map[progress.State]string, tty *os.File) (GraphTarget, error) { + manager, err := sprogress.NewManager(tty, prompt) if err != nil { return nil, err } @@ -78,16 +78,19 @@ func (t *graphTarget) Push(ctx context.Context, expected ocispec.Descriptor, con return err } defer r.Close() - r.Start() + if err := progress.Start(r); err != nil { + return err + } if err := t.GraphTarget.Push(ctx, expected, r); err != nil { if errors.Is(err, errdef.ErrAlreadyExists) { // allowed error types in oras-go oci and memory store - r.Done() + if err := progress.Done(r); err != nil { + return err + } } return err } - r.Done() - return nil + return progress.Done(r) } // PushReference pushes the content to the base oras.GraphTarget with tracking. @@ -97,13 +100,14 @@ func (rgt *referenceGraphTarget) PushReference(ctx context.Context, expected oci return err } defer r.Close() - r.Start() + if err := progress.Start(r); err != nil { + return err + } err = rgt.GraphTarget.(registry.ReferencePusher).PushReference(ctx, expected, r, reference) if err != nil { return err } - r.Done() - return nil + return progress.Done(r) } // Close closes the tracking manager. @@ -112,9 +116,16 @@ func (t *graphTarget) Close() error { } // Report prompts the user with the provided state and descriptor. -func (t *graphTarget) Report(desc ocispec.Descriptor, state track.State) error { - return track.Record(t.manager, desc, track.Status{ +func (t *graphTarget) Report(desc ocispec.Descriptor, state progress.State) error { + tracker, err := t.manager.Track(desc) + if err != nil { + return err + } + if err = tracker.Update(progress.Status{ State: state, Offset: desc.Size, - }) + }); err != nil { + return err + } + return tracker.Close() } diff --git a/cmd/oras/internal/display/status/tty.go b/cmd/oras/internal/display/status/tty.go index 304a4faae..adf1b8f39 100644 --- a/cmd/oras/internal/display/status/tty.go +++ b/cmd/oras/internal/display/status/tty.go @@ -25,14 +25,14 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2" "oras.land/oras-go/v2/content" - strack "oras.land/oras/cmd/oras/internal/display/status/track" - "oras.land/oras/internal/experimental/track" + "oras.land/oras/cmd/oras/internal/display/status/track" + "oras.land/oras/internal/progress" ) // TTYPushHandler handles TTY status output for push command. type TTYPushHandler struct { tty *os.File - tracked strack.GraphTarget + tracked track.GraphTarget committed *sync.Map fetcher content.Fetcher } @@ -58,13 +58,13 @@ func (ph *TTYPushHandler) OnEmptyArtifact() error { // TrackTarget returns a tracked target. func (ph *TTYPushHandler) TrackTarget(gt oras.GraphTarget) (oras.GraphTarget, StopTrackTargetFunc, error) { - prompt := map[track.State]string{ - track.StateInitialized: PushPromptUploading, - track.StateTransmitting: PushPromptUploading, - track.StateTransmitted: PushPromptUploaded, - track.StateExists: PushPromptExists, + prompt := map[progress.State]string{ + progress.StateInitialized: PushPromptUploading, + progress.StateTransmitting: PushPromptUploading, + progress.StateTransmitted: PushPromptUploaded, + progress.StateExists: PushPromptExists, } - tracked, err := strack.NewTarget(gt, prompt, ph.tty) + tracked, err := track.NewTarget(gt, prompt, ph.tty) if err != nil { return nil, nil, err } @@ -75,7 +75,7 @@ func (ph *TTYPushHandler) TrackTarget(gt oras.GraphTarget) (oras.GraphTarget, St // OnCopySkipped is called when an object already exists. func (ph *TTYPushHandler) OnCopySkipped(_ context.Context, desc ocispec.Descriptor) error { ph.committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - return ph.tracked.Report(desc, track.StateExists) + return ph.tracked.Report(desc, progress.StateExists) } // PreCopy implements PreCopy of CopyHandler. @@ -91,7 +91,7 @@ func (ph *TTYPushHandler) PostCopy(ctx context.Context, desc ocispec.Descriptor) return err } for _, successor := range successors { - if err = ph.tracked.Report(successor, track.StateSkipped); err != nil { + if err = ph.tracked.Report(successor, progress.StateSkipped); err != nil { return err } } @@ -106,7 +106,7 @@ func NewTTYAttachHandler(tty *os.File, fetcher content.Fetcher) AttachHandler { // TTYPullHandler handles TTY status output for pull events. type TTYPullHandler struct { tty *os.File - tracked strack.GraphTarget + tracked track.GraphTarget } // NewTTYPullHandler returns a new handler for Pull status events. @@ -133,24 +133,24 @@ func (ph *TTYPullHandler) OnNodeProcessing(_ ocispec.Descriptor) error { // OnNodeRestored implements PullHandler. func (ph *TTYPullHandler) OnNodeRestored(desc ocispec.Descriptor) error { - return ph.tracked.Report(desc, track.StateMounted) + return ph.tracked.Report(desc, progress.StateMounted) } // OnNodeSkipped implements PullHandler. func (ph *TTYPullHandler) OnNodeSkipped(desc ocispec.Descriptor) error { - return ph.tracked.Report(desc, track.StateSkipped) + return ph.tracked.Report(desc, progress.StateSkipped) } // TrackTarget returns a tracked target. func (ph *TTYPullHandler) TrackTarget(gt oras.GraphTarget) (oras.GraphTarget, StopTrackTargetFunc, error) { - prompt := map[track.State]string{ - track.StateInitialized: PullPromptDownloading, - track.StateTransmitting: PullPromptDownloading, - track.StateTransmitted: PullPromptPulled, - track.StateSkipped: PullPromptSkipped, - track.StateMounted: PullPromptRestored, + prompt := map[progress.State]string{ + progress.StateInitialized: PullPromptDownloading, + progress.StateTransmitting: PullPromptDownloading, + progress.StateTransmitted: PullPromptPulled, + progress.StateSkipped: PullPromptSkipped, + progress.StateMounted: PullPromptRestored, } - tracked, err := strack.NewTarget(gt, prompt, ph.tty) + tracked, err := track.NewTarget(gt, prompt, ph.tty) if err != nil { return nil, nil, err } @@ -174,8 +174,16 @@ func NewTTYCopyHandler(tty *os.File) CopyHandler { // StartTracking returns a tracked target from a graph target. func (ch *TTYCopyHandler) StartTracking(gt oras.GraphTarget) (oras.GraphTarget, error) { + prompt := map[progress.State]string{ + progress.StateInitialized: copyPromptCopying, + progress.StateTransmitting: copyPromptCopying, + progress.StateTransmitted: copyPromptCopied, + progress.StateExists: copyPromptExists, + progress.StateSkipped: copyPromptSkipped, + progress.StateMounted: copyPromptMounted, + } var err error - ch.tracked, err = track.NewTarget(gt, copyPromptCopying, copyPromptCopied, ch.tty) + ch.tracked, err = track.NewTarget(gt, prompt, ch.tty) if err != nil { return nil, err } @@ -190,7 +198,7 @@ func (ch *TTYCopyHandler) StopTracking() error { // OnCopySkipped is called when an object already exists. func (ch *TTYCopyHandler) OnCopySkipped(_ context.Context, desc ocispec.Descriptor) error { ch.committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - return ch.tracked.Prompt(desc, copyPromptExists) + return ch.tracked.Report(desc, progress.StateExists) } // PreCopy implements PreCopy of CopyHandler. @@ -206,7 +214,7 @@ func (ch *TTYCopyHandler) PostCopy(ctx context.Context, desc ocispec.Descriptor) return err } for _, successor := range successors { - if err = ch.tracked.Prompt(successor, copyPromptSkipped); err != nil { + if err = ch.tracked.Report(successor, progress.StateSkipped); err != nil { return err } } @@ -216,5 +224,5 @@ func (ch *TTYCopyHandler) PostCopy(ctx context.Context, desc ocispec.Descriptor) // OnMounted implements OnMounted of CopyHandler. func (ch *TTYCopyHandler) OnMounted(_ context.Context, desc ocispec.Descriptor) error { ch.committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - return ch.tracked.Prompt(desc, copyPromptMounted) + return ch.tracked.Report(desc, progress.StateMounted) } diff --git a/cmd/oras/root/blob/fetch.go b/cmd/oras/root/blob/fetch.go index 44694c428..a41af6ecc 100644 --- a/cmd/oras/root/blob/fetch.go +++ b/cmd/oras/root/blob/fetch.go @@ -31,6 +31,7 @@ import ( "oras.land/oras/cmd/oras/internal/display/status/track" oerrors "oras.land/oras/cmd/oras/internal/errors" "oras.land/oras/cmd/oras/internal/option" + "oras.land/oras/internal/progress" ) type fetchBlobOptions struct { @@ -176,11 +177,15 @@ func (opts *fetchBlobOptions) doFetch(ctx context.Context, src oras.ReadOnlyTarg return ocispec.Descriptor{}, err } defer trackedReader.StopManager() - trackedReader.Start() + if err := progress.Start(trackedReader); err != nil { + return ocispec.Descriptor{}, err + } if _, err = io.Copy(writer, trackedReader); err != nil { return ocispec.Descriptor{}, err } - trackedReader.Done() + if err := progress.Done(trackedReader); err != nil { + return ocispec.Descriptor{}, err + } } if err := vr.Verify(); err != nil { return ocispec.Descriptor{}, err diff --git a/cmd/oras/root/blob/push.go b/cmd/oras/root/blob/push.go index 883de2fe5..b5bb4fac6 100644 --- a/cmd/oras/root/blob/push.go +++ b/cmd/oras/root/blob/push.go @@ -31,6 +31,7 @@ import ( "oras.land/oras/cmd/oras/internal/option" "oras.land/oras/cmd/oras/internal/output" "oras.land/oras/internal/file" + "oras.land/oras/internal/progress" ) type pushBlobOptions struct { @@ -164,11 +165,11 @@ func (opts *pushBlobOptions) doPush(ctx context.Context, printer *output.Printer return err } defer trackedReader.StopManager() - trackedReader.Start() - r = trackedReader - if err := t.Push(ctx, desc, r); err != nil { + if err := progress.Start(trackedReader); err != nil { return err } - trackedReader.Done() - return nil + if err := t.Push(ctx, desc, trackedReader); err != nil { + return err + } + return progress.Done(trackedReader) } diff --git a/cmd/oras/root/cp.go b/cmd/oras/root/cp.go index 2a5cc5cd4..c6693edcb 100644 --- a/cmd/oras/root/cp.go +++ b/cmd/oras/root/cp.go @@ -34,11 +34,9 @@ import ( "oras.land/oras/cmd/oras/internal/command" "oras.land/oras/cmd/oras/internal/display" "oras.land/oras/cmd/oras/internal/display/status" - strack "oras.land/oras/cmd/oras/internal/display/status/track" oerrors "oras.land/oras/cmd/oras/internal/errors" "oras.land/oras/cmd/oras/internal/option" "oras.land/oras/internal/docker" - "oras.land/oras/internal/experimental/track" "oras.land/oras/internal/graph" "oras.land/oras/internal/listener" "oras.land/oras/internal/registryutil" @@ -177,49 +175,6 @@ func doCopy(ctx context.Context, copyHandler status.CopyHandler, src oras.ReadOn dst, err = copyHandler.StartTracking(dst) if err != nil { return desc, err - if opts.TTY == nil { - // no TTY output - extendedCopyOptions.OnCopySkipped = copyHandler.OnCopySkipped - extendedCopyOptions.PreCopy = copyHandler.PreCopy - extendedCopyOptions.PostCopy = copyHandler.PostCopy - extendedCopyOptions.OnMounted = copyHandler.OnMounted - } else { - // TTY output - prompt := map[track.State]string{ - track.StateInitialized: promptCopying, - track.StateTransmitting: promptCopying, - track.StateTransmitted: promptCopied, - track.StateExists: promptExists, - track.StateSkipped: promptSkipped, - track.StateMounted: promptMounted, - } - tracked, err := strack.NewTarget(dst, prompt, opts.TTY) - if err != nil { - return ocispec.Descriptor{}, err - } - defer tracked.Close() - dst = tracked - extendedCopyOptions.OnCopySkipped = func(ctx context.Context, desc ocispec.Descriptor) error { - committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - return tracked.Report(desc, track.StateExists) - } - extendedCopyOptions.PostCopy = func(ctx context.Context, desc ocispec.Descriptor) error { - committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - successors, err := graph.FilteredSuccessors(ctx, desc, tracked, status.DeduplicatedFilter(committed)) - if err != nil { - return err - } - for _, successor := range successors { - if err = tracked.Report(successor, track.StateSkipped); err != nil { - return err - } - } - return nil - } - extendedCopyOptions.OnMounted = func(ctx context.Context, desc ocispec.Descriptor) error { - committed.Store(desc.Digest.String(), desc.Annotations[ocispec.AnnotationTitle]) - return tracked.Report(desc, track.StateMounted) - } } defer func() { stopErr := copyHandler.StopTracking() diff --git a/internal/experimental/track/interface.go b/internal/experimental/track/interface.go deleted file mode 100644 index 972d8d519..000000000 --- a/internal/experimental/track/interface.go +++ /dev/null @@ -1,62 +0,0 @@ -package track - -import ( - "io" - - ocispec "github.com/opencontainers/image-spec/specs-go/v1" -) - -// State represents the state of a descriptor. -type State int - -const ( - StateUnknown State = iota - StateInitialized - StateTransmitting - StateTransmitted - StateExists - StateSkipped - StateMounted -) - -// Status represents the status of a descriptor. -type Status struct { - // State represents the state of the descriptor. - State State - - // Offset represents the current offset of the descriptor. - // Offset is discarded if set to a negative value. - Offset int64 -} - -// Tracker updates the status of a descriptor. -type Tracker interface { - io.Closer - - // Update updates the status of the descriptor. - Update(status Status) error - - // Fail marks the descriptor as failed. - Fail(err error) error -} - -// Manager tracks the progress of multiple descriptors. -type Manager interface { - io.Closer - - // Track starts tracking the progress of a descriptor. - Track(desc ocispec.Descriptor) (Tracker, error) -} - -// Record adds the progress of a descriptor as a single entry. -func Record(m Manager, desc ocispec.Descriptor, status Status) error { - tracker, err := m.Track(desc) - if err != nil { - return err - } - err = tracker.Update(status) - if err != nil { - return err - } - return tracker.Close() -} diff --git a/internal/experimental/track/reader.go b/internal/experimental/track/reader.go deleted file mode 100644 index d53da0374..000000000 --- a/internal/experimental/track/reader.go +++ /dev/null @@ -1,56 +0,0 @@ -package track - -import "io" - -// ReadTracker tracks the transmission based on the read operation. -type ReadTracker struct { - base io.Reader - tracker Tracker - offset int64 -} - -// NewReadTracker attaches a tracker to a reader. -func NewReadTracker(track Tracker, r io.Reader) *ReadTracker { - return &ReadTracker{ - base: r, - tracker: track, - } -} - -// Read reads from the base reader and updates the status. -func (rt *ReadTracker) Read(p []byte) (n int, err error) { - n, err = rt.base.Read(p) - rt.offset += int64(n) - _ = rt.tracker.Update(Status{ - State: StateTransmitting, - Offset: rt.offset, - }) - if err != nil && err != io.EOF { - _ = rt.tracker.Fail(err) - } - return n, err -} - -// Close closes the tracker. -func (rt *ReadTracker) Close() error { - return rt.tracker.Close() -} - -// Start starts tracking the transmission. -func (rt *ReadTracker) Start() error { - return rt.tracker.Update(Status{ - State: StateInitialized, - Offset: -1, - }) -} - -// Done marks the transmission as complete. -// Done should be called after the transmission is complete. -// Note: Reading all content from the reader does not imply the transmission is -// complete. -func (rt *ReadTracker) Done() error { - return rt.tracker.Update(Status{ - State: StateTransmitted, - Offset: -1, - }) -} From fa1d61a1279d23cf6807386f876274150620c07c Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Mon, 30 Dec 2024 17:20:01 +0800 Subject: [PATCH 09/15] fix: fix bugs and tests Signed-off-by: Shiwei Zhang --- .../display/status/progress/manager.go | 2 ++ .../display/status/progress/manager_test.go | 3 +- .../display/status/progress/messenger_test.go | 28 +++++++++++-------- .../display/status/track/target_test.go | 13 ++++++--- internal/testutils/prompt.go | 5 ++-- 5 files changed, 32 insertions(+), 19 deletions(-) diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index 19a627b4a..81b6b398c 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -55,6 +55,7 @@ func NewManager(tty *os.File, prompt map[progress.State]string) (progress.Manage console: c, renderDone: make(chan struct{}), renderClosed: make(chan struct{}), + prompt: prompt, } m.start() return m, nil @@ -105,6 +106,7 @@ func (m *manager) Track(desc ocispec.Descriptor) (progress.Tracker, error) { } s := newStatus() + s.descriptor = desc m.statusLock.Lock() m.status = append(m.status, s) m.statusLock.Unlock() diff --git a/cmd/oras/internal/display/status/progress/manager_test.go b/cmd/oras/internal/display/status/progress/manager_test.go index 43d0f2104..c8c2f9551 100644 --- a/cmd/oras/internal/display/status/progress/manager_test.go +++ b/cmd/oras/internal/display/status/progress/manager_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras/cmd/oras/internal/display/status/console" "oras.land/oras/internal/testutils" ) @@ -41,7 +42,7 @@ func Test_manager_render(t *testing.T) { } height, _ := m.console.GetHeightWidth() for i := 0; i < height; i++ { - if _, err := m.Add(); err != nil { + if _, err := m.Track(ocispec.Descriptor{}); err != nil { t.Fatal(err) } } diff --git a/cmd/oras/internal/display/status/progress/messenger_test.go b/cmd/oras/internal/display/status/progress/messenger_test.go index a8b782e55..8ae5b385c 100644 --- a/cmd/oras/internal/display/status/progress/messenger_test.go +++ b/cmd/oras/internal/display/status/progress/messenger_test.go @@ -16,16 +16,24 @@ limitations under the License. package progress import ( - v1 "github.com/opencontainers/image-spec/specs-go/v1" "testing" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) func Test_Messenger(t *testing.T) { var msg *status ch := make(chan *status, BufferSize) - messenger := &Messenger{ch: ch} + desc := ocispec.Descriptor{ + Digest: "mouse", + Size: 100, + } + messenger := &Messenger{ + ch: ch, + desc: desc, + } - messenger.Start() + messenger.start() select { case msg = <-ch: if msg.offset != -1 { @@ -35,12 +43,8 @@ func Test_Messenger(t *testing.T) { t.Error("Expected start message") } - desc := v1.Descriptor{ - Digest: "mouse", - Size: 100, - } expected := int64(50) - messenger.Send("Reading", desc, expected) + messenger.send("Reading", expected) select { case msg = <-ch: if msg.offset != expected { @@ -53,8 +57,8 @@ func Test_Messenger(t *testing.T) { t.Error("Expected status message") } - messenger.Send("Reading", desc, expected) - messenger.Send("Read", desc, desc.Size) + messenger.send("Reading", expected) + messenger.send("Read", desc.Size) select { case msg = <-ch: if msg.offset != desc.Size { @@ -73,7 +77,7 @@ func Test_Messenger(t *testing.T) { } expected = int64(-1) - messenger.Stop() + messenger.stop() select { case msg = <-ch: if msg.offset != expected { @@ -83,7 +87,7 @@ func Test_Messenger(t *testing.T) { t.Error("Expected END status message") } - messenger.Stop() + messenger.stop() select { case msg = <-ch: if msg != nil { diff --git a/cmd/oras/internal/display/status/track/target_test.go b/cmd/oras/internal/display/status/track/target_test.go index 0630c8a9f..3e5fe62e3 100644 --- a/cmd/oras/internal/display/status/track/target_test.go +++ b/cmd/oras/internal/display/status/track/target_test.go @@ -30,6 +30,7 @@ import ( "oras.land/oras-go/v2/content/memory" "oras.land/oras-go/v2/errdef" "oras.land/oras-go/v2/registry/remote" + "oras.land/oras/internal/progress" "oras.land/oras/internal/testutils" ) @@ -62,9 +63,11 @@ func Test_referenceGraphTarget_PushReference(t *testing.T) { } // test tag := "tagged" - actionPrompt := "action" donePrompt := "done" - target, err := NewTarget(&testReferenceGraphTarget{src}, actionPrompt, donePrompt, device) + prompt := map[progress.State]string{ + progress.StateTransmitted: donePrompt, + } + target, err := NewTarget(&testReferenceGraphTarget{src}, prompt, device) if err != nil { t.Fatal(err) } @@ -108,9 +111,11 @@ func Test_graphTarget_Push_alreadyExists(t *testing.T) { t.Fatal("Failed to prepare test environment:", err) } // test - actionPrompt := "action" donePrompt := "done" - target, err := NewTarget(src, actionPrompt, donePrompt, device) + prompt := map[progress.State]string{ + progress.StateTransmitted: donePrompt, + } + target, err := NewTarget(src, prompt, device) if err != nil { t.Fatal(err) } diff --git a/internal/testutils/prompt.go b/internal/testutils/prompt.go index f40763d94..45bda6203 100644 --- a/internal/testutils/prompt.go +++ b/internal/testutils/prompt.go @@ -20,6 +20,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2" + "oras.land/oras/internal/progress" ) // PromptDiscarder mocks trackable GraphTarget with discarded prompt. @@ -29,7 +30,7 @@ type PromptDiscarder struct { } // Prompt discards the prompt. -func (p *PromptDiscarder) Prompt(ocispec.Descriptor, string) error { +func (p *PromptDiscarder) Report(ocispec.Descriptor, progress.State) error { return nil } @@ -48,6 +49,6 @@ func NewErrorPrompt(err error) *ErrorPrompt { } // Prompt mocks an errored prompt. -func (e *ErrorPrompt) Prompt(ocispec.Descriptor, string) error { +func (e *ErrorPrompt) Report(ocispec.Descriptor, progress.State) error { return e.wanted } From d240656f8206ff0860323644d9018556a82475d3 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Mon, 30 Dec 2024 17:57:56 +0800 Subject: [PATCH 10/15] fix: fix status Signed-off-by: Shiwei Zhang --- .../display/status/progress/manager.go | 8 ++-- .../display/status/progress/messenger.go | 45 +------------------ .../display/status/progress/messenger_test.go | 13 +++--- .../display/status/progress/status.go | 38 ++++++++++++++-- .../display/status/progress/status_test.go | 30 ++++++------- 5 files changed, 57 insertions(+), 77 deletions(-) diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index 81b6b398c..07d0e1044 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -105,18 +105,17 @@ func (m *manager) Track(desc ocispec.Descriptor) (progress.Tracker, error) { return nil, errManagerStopped } - s := newStatus() - s.descriptor = desc + s := newStatus(desc) m.statusLock.Lock() m.status = append(m.status, s) m.statusLock.Unlock() defer m.console.NewRow() defer m.console.NewRow() - return m.statusChan(s, desc), nil + return m.statusChan(s), nil } -func (m *manager) statusChan(s *status, desc ocispec.Descriptor) progress.Tracker { +func (m *manager) statusChan(s *status) progress.Tracker { ch := make(chan *status, BufferSize) m.updating.Add(1) go func() { @@ -127,7 +126,6 @@ func (m *manager) statusChan(s *status, desc ocispec.Descriptor) progress.Tracke }() return &Messenger{ ch: ch, - desc: desc, prompt: m.prompt, } } diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go index 8d1e201ec..cb9de3fba 100644 --- a/cmd/oras/internal/display/status/progress/messenger.go +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -15,19 +15,12 @@ limitations under the License. package progress -import ( - "time" - - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "oras.land/oras/cmd/oras/internal/display/status/progress/humanize" - "oras.land/oras/internal/progress" -) +import "oras.land/oras/internal/progress" // Messenger is progress message channel. type Messenger struct { ch chan *status closed bool - desc ocispec.Descriptor prompt map[progress.State]string } @@ -60,7 +53,7 @@ func (m *Messenger) start() { func (m *Messenger) send(prompt string, offset int64) { for { select { - case m.ch <- newStatusMessage(prompt, m.desc, offset): + case m.ch <- newStatusMessage(prompt, offset): return case <-m.ch: // purge the channel until successfully pushed @@ -80,37 +73,3 @@ func (m *Messenger) stop() { close(m.ch) m.closed = true } - -// newStatus generates a base empty status. -func newStatus() *status { - return &status{ - offset: -1, - total: humanize.ToBytes(0), - speedWindow: newSpeedWindow(framePerSecond), - } -} - -// newStatusMessage generates a status for messaging. -func newStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { - return &status{ - prompt: prompt, - descriptor: descriptor, - offset: offset, - } -} - -// startTiming creates start timing message. -func startTiming() *status { - return &status{ - offset: -1, - startTime: time.Now(), - } -} - -// endTiming creates end timing message. -func endTiming() *status { - return &status{ - offset: -1, - endTime: time.Now(), - } -} diff --git a/cmd/oras/internal/display/status/progress/messenger_test.go b/cmd/oras/internal/display/status/progress/messenger_test.go index 8ae5b385c..e41ea25db 100644 --- a/cmd/oras/internal/display/status/progress/messenger_test.go +++ b/cmd/oras/internal/display/status/progress/messenger_test.go @@ -24,14 +24,8 @@ import ( func Test_Messenger(t *testing.T) { var msg *status ch := make(chan *status, BufferSize) - desc := ocispec.Descriptor{ - Digest: "mouse", - Size: 100, - } - messenger := &Messenger{ - ch: ch, - desc: desc, - } + + messenger := &Messenger{ch: ch} messenger.start() select { @@ -43,6 +37,9 @@ func Test_Messenger(t *testing.T) { t.Error("Expected start message") } + desc := ocispec.Descriptor{ + Size: 100, + } expected := int64(50) messenger.send("Reading", expected) select { diff --git a/cmd/oras/internal/display/status/progress/status.go b/cmd/oras/internal/display/status/progress/status.go index feb653834..7d35bc1b2 100644 --- a/cmd/oras/internal/display/status/progress/status.go +++ b/cmd/oras/internal/display/status/progress/status.go @@ -56,6 +56,40 @@ type status struct { lock sync.Mutex } +// newStatus generates a base empty status. +func newStatus(desc ocispec.Descriptor) *status { + return &status{ + descriptor: desc, + offset: -1, + total: humanize.ToBytes(desc.Size), + speedWindow: newSpeedWindow(framePerSecond), + } +} + +// newStatusMessage generates a status for messaging. +func newStatusMessage(prompt string, offset int64) *status { + return &status{ + prompt: prompt, + offset: offset, + } +} + +// startTiming creates start timing message. +func startTiming() *status { + return &status{ + offset: -1, + startTime: time.Now(), + } +} + +// endTiming creates end timing message. +func endTiming() *status { + return &status{ + offset: -1, + endTime: time.Now(), + } +} + func (s *status) isZero() bool { return s.offset < 0 && s.startTime.IsZero() && s.endTime.IsZero() } @@ -167,10 +201,6 @@ func (s *status) update(n *status) { if n.offset >= 0 { s.offset = n.offset - if n.descriptor.Size != s.descriptor.Size { - s.total = humanize.ToBytes(n.descriptor.Size) - } - s.descriptor = n.descriptor } if n.prompt != "" { s.prompt = n.prompt diff --git a/cmd/oras/internal/display/status/progress/status_test.go b/cmd/oras/internal/display/status/progress/status_test.go index 8542d651d..cc03f44af 100644 --- a/cmd/oras/internal/display/status/progress/status_test.go +++ b/cmd/oras/internal/display/status/progress/status_test.go @@ -29,22 +29,20 @@ import ( func Test_status_String(t *testing.T) { // zero status and progress - s := newStatus() + s := newStatus(ocispec.Descriptor{ + MediaType: "application/vnd.oci.empty.oras.test.v1+json", + Size: 2, + Digest: "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + }) if status, digest := s.String(console.MinWidth); status != zeroStatus || digest != zeroDigest { t.Errorf("status.String() = %v, %v, want %v, %v", status, digest, zeroStatus, zeroDigest) } // not done s.update(&status{ - prompt: "test", - descriptor: ocispec.Descriptor{ - MediaType: "application/vnd.oci.empty.oras.test.v1+json", - Size: 2, - Digest: "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", - }, + prompt: "test", startTime: time.Now().Add(-time.Minute), offset: 0, - total: humanize.ToBytes(2), }) // full name statusStr, digestStr := s.String(120) @@ -70,22 +68,20 @@ func Test_status_String(t *testing.T) { func Test_status_String_zeroWidth(t *testing.T) { // zero status and progress - s := newStatus() + s := newStatus(ocispec.Descriptor{ + MediaType: "application/vnd.oci.empty.oras.test.v1+json", + Size: 0, + Digest: "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }) if status, digest := s.String(console.MinWidth); status != zeroStatus || digest != zeroDigest { t.Errorf("status.String() = %v, %v, want %v, %v", status, digest, zeroStatus, zeroDigest) } // not done s.update(&status{ - prompt: "test", - descriptor: ocispec.Descriptor{ - MediaType: "application/vnd.oci.empty.oras.test.v1+json", - Size: 0, - Digest: "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - }, + prompt: "test", startTime: time.Now().Add(-time.Minute), offset: 0, - total: humanize.ToBytes(0), }) // not done statusStr, digestStr := s.String(120) @@ -105,7 +101,7 @@ func Test_status_String_zeroWidth(t *testing.T) { } func Test_status_durationString(t *testing.T) { // zero duration - s := newStatus() + s := newStatus(ocispec.Descriptor{}) if d := s.durationString(); d != zeroDuration { t.Errorf("status.durationString() = %v, want %v", d, zeroDuration) } From fccb56740864af60ed8c4bec8333d053cb23fc04 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Tue, 31 Dec 2024 16:22:57 +0800 Subject: [PATCH 11/15] fix: fix tracker and add tests Signed-off-by: Shiwei Zhang --- internal/progress/tracker.go | 30 ++- internal/progress/tracker_test.go | 414 ++++++++++++++++++++++++++++++ 2 files changed, 437 insertions(+), 7 deletions(-) create mode 100644 internal/progress/tracker_test.go diff --git a/internal/progress/tracker.go b/internal/progress/tracker.go index 58ee8f624..0431a767e 100644 --- a/internal/progress/tracker.go +++ b/internal/progress/tracker.go @@ -88,6 +88,8 @@ type readTracker struct { } // Read reads from the base reader and updates the status. +// On partial read, the tracker treats it as two reads: a successful read with +// status update and a failed read with failure report. func (rt *readTracker) Read(p []byte) (int, error) { n, err := rt.base.Read(p) rt.offset += int64(n) @@ -96,7 +98,7 @@ func (rt *readTracker) Read(p []byte) (int, error) { State: StateTransmitting, Offset: rt.offset, }); updateErr != nil { - return n, updateErr + err = updateErr } } if err != nil && err != io.EOF { @@ -113,6 +115,8 @@ type readTrackerWriteTo struct { } // WriteTo writes to the base writer and updates the status. +// On partial write, the tracker treats it as two writes: a successful write +// with status update and a failed write with failure report. func (rt *readTrackerWriteTo) WriteTo(w io.Writer) (int64, error) { wt := &writeTracker{ base: w, @@ -121,17 +125,25 @@ func (rt *readTrackerWriteTo) WriteTo(w io.Writer) (int64, error) { } n, err := rt.base.(io.WriterTo).WriteTo(wt) rt.offset = wt.offset + if err != nil && wt.trackerErr == nil { + if failErr := rt.tracker.Fail(err); failErr != nil { + return n, failErr + } + } return n, err } // writeTracker tracks the transmission based on the write operation. type writeTracker struct { - base io.Writer - tracker Tracker - offset int64 + base io.Writer + tracker Tracker + offset int64 + trackerErr error } // Write writes to the base writer and updates the status. +// On partial write, the tracker treats it as two writes: a successful write +// with status update and a failed write with failure report. func (wt *writeTracker) Write(p []byte) (int, error) { n, err := wt.base.Write(p) wt.offset += int64(n) @@ -140,11 +152,15 @@ func (wt *writeTracker) Write(p []byte) (int, error) { State: StateTransmitting, Offset: wt.offset, }); updateErr != nil { - return n, updateErr + wt.trackerErr = updateErr + err = updateErr } } if err != nil { - return n, wt.tracker.Fail(err) + if failErr := wt.tracker.Fail(err); failErr != nil { + wt.trackerErr = failErr + return n, failErr + } } - return n, nil + return n, err } diff --git a/internal/progress/tracker_test.go b/internal/progress/tracker_test.go new file mode 100644 index 000000000..f4190cc0c --- /dev/null +++ b/internal/progress/tracker_test.go @@ -0,0 +1,414 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "bytes" + "errors" + "io" + "testing" +) + +func TestTrackerFunc_Close(t *testing.T) { + var f TrackerFunc + if err := f.Close(); err != nil { + t.Errorf("TrackerFunc.Close() error = %v, wantErr false", err) + } +} + +func TestTrackerFunc_Update(t *testing.T) { + wantStatus := Status{ + State: StateTransmitted, + Offset: 42, + } + var wantErr error + tracker := TrackerFunc(func(status Status, err error) error { + if status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != nil { + t.Errorf("TrackerFunc err = %v, want nil", err) + } + return wantErr + }) + + if err := tracker.Update(wantStatus); err != wantErr { + t.Errorf("TrackerFunc.Update() error = %v, want %v", err, wantErr) + } + + wantErr = errors.New("fail to track") + if err := tracker.Update(wantStatus); err != wantErr { + t.Errorf("TrackerFunc.Update() error = %v, want %v", err, wantErr) + } +} + +func TestTrackerFunc_Fail(t *testing.T) { + reportErr := errors.New("fail to process") + var wantStatus Status + var wantErr error + tracker := TrackerFunc(func(status Status, err error) error { + if status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != reportErr { + t.Errorf("TrackerFunc err = %v, want %v", err, reportErr) + } + return wantErr + }) + + if err := tracker.Fail(reportErr); err != wantErr { + t.Errorf("TrackerFunc.Fail() error = %v, want %v", err, wantErr) + } + + wantErr = errors.New("fail to track") + if err := tracker.Fail(reportErr); err != wantErr { + t.Errorf("TrackerFunc.Fail() error = %v, want %v", err, wantErr) + } +} + +func TestStart(t *testing.T) { + tests := []struct { + name string + t Tracker + wantErr bool + }{ + { + name: "successful report initialization", + t: TrackerFunc(func(status Status, err error) error { + if status.State != StateInitialized { + t.Errorf("expected state to be StateInitialized, got %v", status.State) + } + return nil + }), + }, + { + name: "fail to report initialization", + t: TrackerFunc(func(status Status, err error) error { + return errors.New("fail to track") + }), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := Start(tt.t); (err != nil) != tt.wantErr { + t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestDone(t *testing.T) { + tests := []struct { + name string + t Tracker + wantErr bool + }{ + { + name: "successful report initialization", + t: TrackerFunc(func(status Status, err error) error { + if status.State != StateTransmitted { + t.Errorf("expected state to be StateTransmitted, got %v", status.State) + } + return nil + }), + }, + { + name: "fail to report initialization", + t: TrackerFunc(func(status Status, err error) error { + return errors.New("fail to track") + }), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := Done(tt.t); (err != nil) != tt.wantErr { + t.Errorf("Done() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestTrackReader(t *testing.T) { + const bufSize = 6 + content := []byte("hello world") + t.Run("track io.Reader", func(t *testing.T) { + var wantStatus Status + tracker := TrackerFunc(func(status Status, err error) error { + if status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != nil { + t.Errorf("TrackerFunc err = %v, want nil", err) + } + return nil + }) + var reader io.Reader = bytes.NewReader(content) + reader = io.LimitReader(reader, int64(len(content))) // remove the io.WriterTo interface + gotReader := TrackReader(tracker, reader) + if _, ok := gotReader.(*readTracker); !ok { + t.Fatalf("TrackReader() = %v, want *readTracker", gotReader) + } + + wantStatus = Status{ + State: StateTransmitting, + Offset: bufSize, + } + buf := make([]byte, bufSize) + n, err := gotReader.Read(buf) + if err != nil { + t.Fatalf("TrackReader() error = %v, want nil", err) + } + if n != bufSize { + t.Fatalf("TrackReader() n = %v, want %v", n, bufSize) + } + if want := content[:bufSize]; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + + wantStatus = Status{ + State: StateTransmitting, + Offset: int64(len(content)), + } + n, err = gotReader.Read(buf) + if err != nil { + t.Fatalf("TrackReader() error = %v, want nil", err) + } + if want := len(content) - bufSize; n != want { + t.Fatalf("TrackReader() n = %v, want %v", n, want) + } + buf = buf[:n] + if want := content[bufSize:]; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + }) + + t.Run("track io.Reader + io.WriterTo", func(t *testing.T) { + var wantStatus Status + tracker := TrackerFunc(func(status Status, err error) error { + if status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != nil { + t.Errorf("TrackerFunc err = %v, want nil", err) + } + return nil + }) + var reader io.Reader = bytes.NewReader(content) + gotReader := TrackReader(tracker, reader) + if _, ok := gotReader.(*readTrackerWriteTo); !ok { + t.Fatalf("TrackReader() = %v, want *readTrackerWriteTo", gotReader) + } + + wantStatus = Status{ + State: StateTransmitting, + Offset: bufSize, + } + buf := make([]byte, bufSize) + n, err := gotReader.Read(buf) + if err != nil { + t.Fatalf("TrackReader() error = %v, want nil", err) + } + if n != bufSize { + t.Fatalf("TrackReader() n = %v, want %v", n, bufSize) + } + if want := content[:bufSize]; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + + wantStatus = Status{ + State: StateTransmitting, + Offset: int64(len(content)), + } + writeBuf := bytes.NewBuffer(nil) + wn, err := gotReader.(io.WriterTo).WriteTo(writeBuf) + if err != nil { + t.Fatalf("TrackReader() error = %v, want nil", err) + } + if want := len(content) - bufSize; wn != int64(want) { + t.Fatalf("TrackReader() n = %v, want %v", wn, want) + } + buf = writeBuf.Bytes() + if want := content[bufSize:]; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + }) + + t.Run("empty io.Reader", func(t *testing.T) { + tracker := TrackerFunc(func(status Status, err error) error { + t.Errorf("TrackerFunc should not be called for empty read") + return nil + }) + gotReader := TrackReader(tracker, bytes.NewReader(nil)) + + buf := make([]byte, bufSize) + n, err := gotReader.Read(buf) + if want := io.EOF; err != want { + t.Fatalf("TrackReader() error = %v, want %v", err, want) + } + if want := 0; n != want { + t.Fatalf("TrackReader() n = %v, want %v", n, want) + } + + writeBuf := bytes.NewBuffer(nil) + wn, err := gotReader.(io.WriterTo).WriteTo(writeBuf) + if err != nil { + t.Fatalf("TrackReader() error = %v, want nil", err) + } + if want := int64(0); wn != want { + t.Fatalf("TrackReader() n = %v, want %v", wn, want) + } + buf = writeBuf.Bytes() + if want := []byte{}; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + }) + + t.Run("report failure", func(t *testing.T) { + var wantStatus Status + wantErr := errors.New("fail to track") + trackerMockStage := 0 + tracker := TrackerFunc(func(status Status, err error) error { + defer func() { + trackerMockStage++ + }() + switch trackerMockStage { + case 0: + if status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != nil { + t.Errorf("TrackerFunc err = %v, want nil", err) + } + return wantErr + case 1: + var emptyStatus Status + if wantStatus := emptyStatus; status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != wantErr { + t.Errorf("TrackerFunc err = %v, want %v", err, wantErr) + } + return nil + default: + t.Errorf("TrackerFunc should not be called") + return nil + } + }) + gotReader := TrackReader(tracker, bytes.NewReader(content)) + + wantStatus = Status{ + State: StateTransmitting, + Offset: bufSize, + } + buf := make([]byte, bufSize) + n, err := gotReader.Read(buf) + if err != wantErr { + t.Fatalf("TrackReader() error = %v, want %v", err, wantErr) + } + if n != bufSize { + t.Fatalf("TrackReader() n = %v, want %v", n, bufSize) + } + if want := content[:bufSize]; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + + wantStatus = Status{ + State: StateTransmitting, + Offset: int64(len(content)), + } + trackerMockStage = 0 + writeBuf := bytes.NewBuffer(nil) + wn, err := gotReader.(io.WriterTo).WriteTo(writeBuf) + if err != wantErr { + t.Fatalf("TrackReader() error = %v, want %v", err, wantErr) + } + if want := len(content) - bufSize; wn != int64(want) { + t.Fatalf("TrackReader() n = %v, want %v", wn, want) + } + buf = writeBuf.Bytes() + if want := content[bufSize:]; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + }) + + t.Run("process failure", func(t *testing.T) { + reportErr := io.ErrClosedPipe + var wantStatus Status + var wantErr error + tracker := TrackerFunc(func(status Status, err error) error { + if status != wantStatus { + t.Errorf("TrackerFunc status = %v, want %v", status, wantStatus) + } + if err != reportErr { + t.Errorf("TrackerFunc err = %v, want %v", err, reportErr) + } + return wantErr + }) + pipeReader, pipeWriter := io.Pipe() + pipeReader.Close() + pipeWriter.Close() + gotReader := TrackReader(tracker, pipeReader) + + buf := make([]byte, bufSize) + n, err := gotReader.Read(buf) + if err != reportErr { + t.Fatalf("TrackReader() error = %v, want %v", err, reportErr) + } + if want := 0; n != want { + t.Fatalf("TrackReader() n = %v, want %v", n, want) + } + + wantErr = errors.New("fail to track") + n, err = gotReader.Read(buf) + if err != wantErr { + t.Fatalf("TrackReader() error = %v, want %v", err, wantErr) + } + if want := 0; n != want { + t.Fatalf("TrackReader() n = %v, want %v", n, want) + } + + gotReader = TrackReader(tracker, io.MultiReader(pipeReader)) // wrap io.WriteTo + wantErr = nil + writeBuf := bytes.NewBuffer(nil) + wn, err := gotReader.(io.WriterTo).WriteTo(writeBuf) + if err != reportErr { + t.Fatalf("TrackReader() error = %v, want %v", err, reportErr) + } + if want := int64(0); wn != want { + t.Fatalf("TrackReader() n = %v, want %v", wn, want) + } + buf = writeBuf.Bytes() + if want := []byte{}; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + + gotReader = TrackReader(tracker, io.MultiReader(pipeReader)) // wrap io.WriteTo + wantErr = errors.New("fail to track") + wn, err = gotReader.(io.WriterTo).WriteTo(writeBuf) + if err != wantErr { + t.Fatalf("TrackReader() error = %v, want %v", err, wantErr) + } + if want := int64(0); wn != want { + t.Fatalf("TrackReader() n = %v, want %v", wn, want) + } + buf = writeBuf.Bytes() + if want := []byte{}; !bytes.Equal(buf, want) { + t.Fatalf("TrackReader() buf = %v, want %v", buf, want) + } + }) +} From dfb24a0b1cc03b591fc0c97d156706e8e576c376 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Tue, 31 Dec 2024 16:23:27 +0800 Subject: [PATCH 12/15] refactor: remove unused ManagerFunc Signed-off-by: Shiwei Zhang --- internal/progress/manager.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/internal/progress/manager.go b/internal/progress/manager.go index 439b90d18..c44b79d34 100644 --- a/internal/progress/manager.go +++ b/internal/progress/manager.go @@ -29,20 +29,3 @@ type Manager interface { // Track starts tracking the progress of a descriptor. Track(desc ocispec.Descriptor) (Tracker, error) } - -// ManagerFunc is an adapter to allow the use of ordinary functions as Managers. -// If f is a function with the appropriate signature, ManagerFunc(f) is a -// [Manager] that calls f. -type ManagerFunc func(desc ocispec.Descriptor, status Status, err error) error - -// Close closes the manager. -func (f ManagerFunc) Close() error { - return nil -} - -// Track starts tracking the progress of a descriptor. -func (f ManagerFunc) Track(desc ocispec.Descriptor) (Tracker, error) { - return TrackerFunc(func(status Status, err error) error { - return f(desc, status, err) - }), nil -} From ca88b54a4576beb0d17a1490445186f3ec89bf24 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Tue, 31 Dec 2024 17:34:09 +0800 Subject: [PATCH 13/15] feat: add failure case Signed-off-by: Shiwei Zhang --- .../display/status/progress/messenger.go | 1 + .../display/status/progress/status.go | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go index cb9de3fba..f4da6cc2b 100644 --- a/cmd/oras/internal/display/status/progress/messenger.go +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -33,6 +33,7 @@ func (m *Messenger) Update(status progress.Status) error { } func (m *Messenger) Fail(err error) error { + m.ch <- fail(err) return nil } diff --git a/cmd/oras/internal/display/status/progress/status.go b/cmd/oras/internal/display/status/progress/status.go index 7d35bc1b2..57b46dadf 100644 --- a/cmd/oras/internal/display/status/progress/status.go +++ b/cmd/oras/internal/display/status/progress/status.go @@ -39,11 +39,13 @@ var ( spinnerColor = aec.LightYellowF doneMarkColor = aec.LightGreenF progressColor = aec.LightBlueB + failureColor = aec.LightRedF ) // status is used as message to update progress view. type status struct { done bool // done is true when the end time is set + err error prompt string descriptor ocispec.Descriptor offset int64 @@ -90,6 +92,13 @@ func endTiming() *status { } } +func fail(err error) *status { + return &status{ + err: err, + offset: -1, + } +} + func (s *status) isZero() bool { return s.offset < 0 && s.startTime.IsZero() && s.endTime.IsZero() } @@ -140,9 +149,13 @@ func (s *status) String(width int) (string, string) { lenBar := int(percent * barLength) bar := fmt.Sprintf("[%s%s]", progressColor.Apply(strings.Repeat(" ", lenBar)), strings.Repeat(".", barLength-lenBar)) speed := s.calculateSpeed() - left = fmt.Sprintf("%s %s(%*s/s) %s %s", - spinnerColor.Apply(string(s.mark.symbol())), - bar, speedLength, speed, s.prompt, name) + var mark string + if s.err == nil { + mark = spinnerColor.Apply(string(s.mark.symbol())) + } else { + mark = failureColor.Apply("✗") + } + left = fmt.Sprintf("%s %s(%*s/s) %s %s", mark, bar, speedLength, speed, s.prompt, name) // bar + wrapper(2) + space(1) + speed + "/s"(2) + wrapper(2) = len(bar) + len(speed) + 7 lenLeft = barLength + speedLength + 7 } else { @@ -199,6 +212,9 @@ func (s *status) update(n *status) { s.lock.Lock() defer s.lock.Unlock() + if n.err != nil { + s.err = n.err + } if n.offset >= 0 { s.offset = n.offset } @@ -211,6 +227,8 @@ func (s *status) update(n *status) { } if !n.endTime.IsZero() { s.endTime = n.endTime - s.done = true + if s.err == nil { + s.done = true + } } } From 44dd2faa2482f02fe8873aa8c43e172885e0e875 Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Tue, 31 Dec 2024 17:49:31 +0800 Subject: [PATCH 14/15] test: add test for status Signed-off-by: Shiwei Zhang --- .../display/status/progress/status_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/cmd/oras/internal/display/status/progress/status_test.go b/cmd/oras/internal/display/status/progress/status_test.go index cc03f44af..9e16a09b5 100644 --- a/cmd/oras/internal/display/status/progress/status_test.go +++ b/cmd/oras/internal/display/status/progress/status_test.go @@ -18,6 +18,7 @@ limitations under the License. package progress import ( + "context" "testing" "time" @@ -99,6 +100,33 @@ func Test_status_String_zeroWidth(t *testing.T) { t.Error(err) } } + +func Test_status_String_failure(t *testing.T) { + // zero status and progress + s := newStatus(ocispec.Descriptor{ + MediaType: "application/vnd.oci.empty.oras.test.v1+json", + Size: 2, + Digest: "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + }) + if status, digest := s.String(console.MinWidth); status != zeroStatus || digest != zeroDigest { + t.Errorf("status.String() = %v, %v, want %v, %v", status, digest, zeroStatus, zeroDigest) + } + + // done with failure + s.update(&status{ + err: context.Canceled, + prompt: "test", + descriptor: s.descriptor, + offset: 1, + startTime: time.Now().Add(-time.Minute), + endTime: time.Now(), + }) + statusStr, digestStr := s.String(120) + if err := testutils.OrderedMatch(statusStr+digestStr, "✗", s.prompt, s.descriptor.MediaType, "1.00/2 B", "50.00%", s.descriptor.Digest.String()); err != nil { + t.Error(err) + } +} + func Test_status_durationString(t *testing.T) { // zero duration s := newStatus(ocispec.Descriptor{}) From 41eef7ada7ee74ee087346c540e94ecc6d1ec03c Mon Sep 17 00:00:00 2001 From: Shiwei Zhang Date: Thu, 13 Feb 2025 17:25:48 +0800 Subject: [PATCH 15/15] docs: fix typo Signed-off-by: Shiwei Zhang --- cmd/oras/internal/display/status/progress/speed.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/oras/internal/display/status/progress/speed.go b/cmd/oras/internal/display/status/progress/speed.go index 75a03184d..7a1acf5bf 100644 --- a/cmd/oras/internal/display/status/progress/speed.go +++ b/cmd/oras/internal/display/status/progress/speed.go @@ -50,7 +50,7 @@ func (w *speedWindow) Add(time time.Time, offset int64) { // Mean returns the mean speed of the window with unit of byte per second. func (w *speedWindow) Mean() float64 { if w.size < 2 { - // no speed diplayed for first read + // no speed displayed for first read return 0 }