Skip to content

refactor(progress): generic progress tracking #1524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ae2a994
feat: add track manager
shizhMSFT Oct 31, 2024
09316f3
feat: use new track
shizhMSFT Oct 31, 2024
96fc648
refactor: new reader
shizhMSFT Nov 1, 2024
3b508d7
doc: improve the doc for Record
shizhMSFT Nov 1, 2024
aaa57b2
refactor: better tracker
shizhMSFT Nov 1, 2024
349a93b
nit: formatting
shizhMSFT Nov 1, 2024
a6a5204
feat: import oras progress package
shizhMSFT Dec 30, 2024
c54d0df
feat!: migrate to the new progress package
shizhMSFT Dec 30, 2024
0b60ea3
fix: fix bugs and tests
shizhMSFT Dec 30, 2024
b90bd20
fix: fix status
shizhMSFT Dec 30, 2024
bec200d
fix: fix tracker and add tests
shizhMSFT Dec 31, 2024
f1a5a3f
refactor: remove unused ManagerFunc
shizhMSFT Dec 31, 2024
19d6361
feat: add failure case
shizhMSFT Dec 31, 2024
fa8bddd
test: add test for status
shizhMSFT Dec 31, 2024
0623344
docs: fix typo
shizhMSFT Feb 13, 2025
1f7aaac
refactor: better render control
shizhMSFT Feb 17, 2025
d323a45
refactor: refactor status
shizhMSFT Feb 17, 2025
c97e76d
docs: add doc to private methods
shizhMSFT Feb 17, 2025
756d40a
fix: fix the accidental reader close by http client
shizhMSFT Feb 17, 2025
b3954f3
fix: fix tty issue
shizhMSFT Feb 17, 2025
d98616a
chore: ignore error for stop tracker
shizhMSFT Feb 17, 2025
191abef
test: init status tests
shizhMSFT Feb 17, 2025
6c9775e
test: test status
shizhMSFT Feb 18, 2025
476a25e
refactor: simplify and test messenger
shizhMSFT Feb 18, 2025
6a54bc1
test: test manager
shizhMSFT Feb 18, 2025
5aa6733
feat: add a new restored status
shizhMSFT Feb 18, 2025
8bddee4
refactor: rename prompt to prompts
shizhMSFT Feb 19, 2025
bc1e555
docs: update docs for test utils
shizhMSFT Feb 19, 2025
cee298f
refactor: make messenger private
shizhMSFT Feb 19, 2025
9b0a7a1
docs: add docs to messenger
shizhMSFT Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 37 additions & 42 deletions cmd/oras/internal/display/status/progress/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,46 @@

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras/cmd/oras/internal/display/status/console"
"oras.land/oras/internal/progress"
)

const (
// BufferSize is the size of the status channel buffer.
BufferSize = 1
// bufferSize is the size of the status channel buffer.
bufferSize = 1
framePerSecond = 5
bufFlushDuration = time.Second / framePerSecond
)

var errManagerStopped = errors.New("progress output manager has already been stopped")

// Manager is progress view master
type Manager interface {
Add() (*Messenger, error)
SendAndStop(desc ocispec.Descriptor, prompt string) error
Close() error
}

type manager struct {
status []*status
statusLock sync.RWMutex
console console.Console
updating sync.WaitGroup
renderDone chan struct{}
renderClosed chan struct{}
prompts map[progress.State]string
}

// NewManager initialized a new progress manager.
func NewManager(tty *os.File) (Manager, error) {
func NewManager(tty *os.File, prompts map[progress.State]string) (progress.Manager, error) {
c, err := console.NewConsole(tty)
if err != nil {
return nil, err
}
return newManager(c, prompts), nil
}

func newManager(c console.Console, prompts map[progress.State]string) progress.Manager {
m := &manager{
console: c,
renderDone: make(chan struct{}),
renderClosed: make(chan struct{}),
prompts: prompts,
}
m.start()
return m, nil
return m
}

func (m *manager) start() {
Expand All @@ -87,59 +87,54 @@
func (m *manager) render() {
m.statusLock.RLock()
defer m.statusLock.RUnlock()
// todo: update size in another routine

// render with culling: only the latter statuses are rendered.
models := m.status
height, width := m.console.GetHeightWidth()
lineCount := len(m.status) * 2
offset := 0
if lineCount > height {
// skip statuses that cannot be rendered
offset = lineCount - height
if n := len(m.status) - height/2; n > 0 {
models = models[n:]
if height%2 == 1 {
view := m.status[n-1].Render(width)
m.console.OutputTo(uint(len(models)*2+1), view[1])
}

Check warning on line 99 in cmd/oras/internal/display/status/progress/manager.go

View check run for this annotation

Codecov / codecov/patch

cmd/oras/internal/display/status/progress/manager.go#L95-L99

Added lines #L95 - L99 were not covered by tests
}

for ; offset < lineCount; offset += 2 {
status, progress := m.status[offset/2].String(width)
m.console.OutputTo(uint(lineCount-offset), status)
m.console.OutputTo(uint(lineCount-offset-1), progress)
viewHeight := len(models) * 2
for i, model := range models {
view := model.Render(width)
m.console.OutputTo(uint(viewHeight-i*2), view[0])
m.console.OutputTo(uint(viewHeight-i*2-1), view[1])
}
}

// Add appends a new status with 2-line space for rendering.
func (m *manager) Add() (*Messenger, error) {
// Track appends a new status with 2-line space for rendering.
func (m *manager) Track(desc ocispec.Descriptor) (progress.Tracker, error) {
if m.closed() {
return nil, errManagerStopped
}

s := newStatus()
s := newStatus(desc)
m.statusLock.Lock()
m.status = append(m.status, s)
m.statusLock.Unlock()

defer m.console.NewRow()
defer m.console.NewRow()
return m.statusChan(s), nil
return m.newTracker(s), nil
}

// SendAndStop send message for descriptor and stop timing.
func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error {
messenger, err := m.Add()
if err != nil {
return err
}
messenger.Send(prompt, desc, desc.Size)
messenger.Stop()
return nil
}

func (m *manager) statusChan(s *status) *Messenger {
ch := make(chan *status, BufferSize)
func (m *manager) newTracker(s *status) progress.Tracker {
ch := make(chan statusUpdate, bufferSize)
m.updating.Add(1)
go func() {
defer m.updating.Done()
for newStatus := range ch {
s.update(newStatus)
for update := range ch {
update(s)
}
}()
return &Messenger{ch: ch}
return &messenger{
update: ch,
prompts: m.prompts,
}
}

// Close stops all status and waits for updating and rendering.
Expand Down
100 changes: 75 additions & 25 deletions cmd/oras/internal/display/status/progress/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build freebsd || linux || netbsd || openbsd || solaris

/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,40 +16,92 @@ limitations under the License.
package progress

import (
"fmt"
"regexp"
"testing"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras/cmd/oras/internal/display/status/console"
"oras.land/oras/internal/testutils"
"oras.land/oras/internal/progress"
)

func Test_manager_render(t *testing.T) {
pty, device, err := testutils.NewPty()
if err != nil {
t.Fatal(err)
type mockConsole struct {
console.Console

view []string
height int
width int
}

func newMockConsole(width, height int) *mockConsole {
return &mockConsole{
height: height,
width: width,
}
}

func (c *mockConsole) GetHeightWidth() (int, int) {
return c.height, c.width
}

func (c *mockConsole) NewRow() {
c.view = append(c.view, "")
}

func (c *mockConsole) OutputTo(upCnt uint, str string) {
c.view[len(c.view)-int(upCnt)] = str
}

func (c *mockConsole) Restore() {}

func (c *mockConsole) Save() {}

func Test_manager(t *testing.T) {
desc := ocispec.Descriptor{
MediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip",
Size: 1234567890,
Digest: "sha256:c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646",
Annotations: map[string]string{
"org.opencontainers.image.title": "hello.bin",
},
}
defer device.Close()
sole, err := console.NewConsole(device)

// simulate a console run
c := newMockConsole(80, 24)
m := newManager(c, map[progress.State]string{
progress.StateExists: "Exists",
})
tracker, err := m.Track(desc)
if err != nil {
t.Fatal(err)
t.Fatalf("manager.Track() error = %v, wantErr nil", err)
}
if err = tracker.Update(progress.Status{
State: progress.StateExists,
Offset: -1,
}); err != nil {
t.Errorf("tracker.Update() error = %v, wantErr nil", err)
}
if err := tracker.Close(); err != nil {
t.Errorf("tracker.Close() error = %v, wantErr nil", err)
}
if err := m.Close(); err != nil {
t.Errorf("manager.Close() error = %v, wantErr nil", err)
}

m := &manager{
console: sole,
// verify the console output
want := []string{
"✓ Exists hello.bin 1.15/1.15 GB 100.00% 0s",
" └─ sha256:c775e7b757ede630cd0aa1113bd102661ab38829ca52a6422ab782862f268646 ",
}
height, _ := m.console.GetHeightWidth()
for i := 0; i < height; i++ {
if _, err := m.Add(); err != nil {
t.Fatal(err)
}
if len(c.view) != len(want) {
t.Errorf("console view length = %d, want %d", len(c.view), len(want))
}
m.render()
// validate
var want []string
for i := height; i > 0; i -= 2 {
want = append(want, fmt.Sprintf("%dF%s", i, zeroStatus))
escRegexp := regexp.MustCompile("\x1b\\[[0-9]+m")
equal := func(got, want string) bool {
return escRegexp.ReplaceAllString(got, "") == want
}
if err = testutils.MatchPty(pty, device, want...); err != nil {
t.Fatal(err)
for i, v := range want {
if !equal(c.view[i], v) {
t.Errorf("console view[%d] = %q, want %q", i, c.view[i], v)
}
}
}
94 changes: 29 additions & 65 deletions cmd/oras/internal/display/status/progress/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,81 +15,45 @@ limitations under the License.

package progress

import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras/cmd/oras/internal/display/status/progress/humanize"
"time"
)
import "oras.land/oras/internal/progress"

// Messenger is progress message channel.
type Messenger struct {
ch chan *status
closed bool
// messenger is progress message channel.
type messenger struct {
update chan statusUpdate
closed bool
prompts map[progress.State]string
}

// Start initializes the messenger.
func (sm *Messenger) Start() {
if sm.ch == nil {
return
}
sm.ch <- startTiming()
}

// Send a status message for the specified descriptor.
func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset int64) {
for {
// Update sends the status to the message channel.
func (m *messenger) Update(status progress.Status) error {
switch status.State {
case progress.StateInitialized:
m.update <- updateStatusStartTime()
case progress.StateTransmitting:
select {
case sm.ch <- newStatusMessage(prompt, descriptor, offset):
return
case <-sm.ch:
// purge the channel until successfully pushed
case m.update <- updateStatusMessage(m.prompts[progress.StateTransmitting], status.Offset):
default:
// ch is nil
return
// drop message if channel is full
}
default:
m.update <- updateStatusMessage(m.prompts[status.State], status.Offset)
}
return nil
}

// Stop the messenger after sending a end message.
func (sm *Messenger) Stop() {
if sm.closed {
return
}
sm.ch <- endTiming()
close(sm.ch)
sm.closed = true
}

// newStatus generates a base empty status.
func newStatus() *status {
return &status{
offset: -1,
total: humanize.ToBytes(0),
speedWindow: newSpeedWindow(framePerSecond),
}
}

// newStatusMessage generates a status for messaging.
func newStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status {
return &status{
prompt: prompt,
descriptor: descriptor,
offset: offset,
}
}

// startTiming creates start timing message.
func startTiming() *status {
return &status{
offset: -1,
startTime: time.Now(),
}
// Fail sends the error to the message channel.
func (m *messenger) Fail(err error) error {
m.update <- updateStatusError(err)
return nil
}

// endTiming creates end timing message.
func endTiming() *status {
return &status{
offset: -1,
endTime: time.Now(),
// Close marks the progress as completed and closes the message channel.
func (m *messenger) Close() error {
if m.closed {
return nil
}
m.update <- updateStatusEndTime()
close(m.update)
m.closed = true
return nil
}
Loading
Loading