Skip to content

Commit

Permalink
fixed buG resulting from uniform extraction of image string from dock…
Browse files Browse the repository at this point in the history
…er.Named struct
  • Loading branch information
imuni4fun committed Feb 28, 2024
1 parent 6f13aba commit f3cd52d
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 20 deletions.
13 changes: 9 additions & 4 deletions pkg/remoteimage/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (

type Puller interface {
Pull(context.Context) error
Image() string
ImageWithTag() string
ImageWithoutTag() string
ImageSize(context.Context) (int, error)
}

Expand All @@ -34,7 +35,11 @@ type puller struct {
keyring credentialprovider.DockerKeyring
}

func (p puller) Image() string {
func (p puller) ImageWithTag() string {
return p.image.String()
}

func (p puller) ImageWithoutTag() string {
return p.image.Name()
}

Expand Down Expand Up @@ -100,8 +105,8 @@ func (p puller) Pull(ctx context.Context) (err error) {
}
}
}()
repo := p.Image()
imageSpec := &cri.ImageSpec{Image: p.Image()}
repo := p.ImageWithoutTag()
imageSpec := &cri.ImageSpec{Image: p.ImageWithTag()}
creds, withCredentials := p.keyring.Lookup(repo)
// klog.V(2).Infof("remoteimage.Pull(): len(creds)=%d, withCreds=%t", len(creds), withCredentials)
if !withCredentials {
Expand Down
12 changes: 12 additions & 0 deletions pkg/remoteimageasync/patterns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package remoteimageasync

import (
"fmt"
"strings"
"testing"

"github.com/containerd/containerd/reference/docker"
"github.com/stretchr/testify/assert"
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage"
)

// demonstrates session channel structure's pass-by-reference is appropriate
Expand Down Expand Up @@ -72,3 +75,12 @@ func TestChannelClose(t *testing.T) {
assert.NotNil(t, err, "error should have been returned")
assert.Contains(t, err.Error(), "closed", "error should indicate channel closed")
}

func TestNamedImageExtraction(t *testing.T) {
parsed, err := docker.ParseDockerRef(nonExistentImage)
assert.Nil(t, err, "parsing image name should succeed")
puller := remoteimage.NewPuller(nil, parsed, nil)
assert.Equal(t, nonExistentImage, puller.ImageWithTag(), "extracted value should match exactly %v", puller)
repo := strings.Split(nonExistentImage, ":")[0]
assert.Equal(t, repo, puller.ImageWithoutTag(), "extracted value should match exactly %v", puller)
}
6 changes: 3 additions & 3 deletions pkg/remoteimageasync/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func RunPullerLoop(
}
go func() {
klog.V(2).Infof("%s.RunPullerLoop(): asked to pull image %s with timeout %v\n",
prefix, ses.Image(), ses.timeout)
prefix, ses.ImageWithTag(), ses.timeout)
ctxAsyncPullTimeoutOrShutdown, cancelDontCare := context.WithTimeout(ctx, ses.timeout) // combine session timeout and shut down signal into one
defer cancelDontCare() // IF we exit, this no longer matters. calling to satisfy linter.
pullStart := time.Now()
Expand All @@ -41,13 +41,13 @@ func RunPullerLoop(
metrics.OperationErrorsCount.WithLabelValues("pull-async-shutdown").Inc()
case <-ctxAsyncPullTimeoutOrShutdown.Done(): // async pull timeout or shutdown
ses.isTimedOut = true
ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.Image())
ses.err = fmt.Errorf("%s.RunPullerLoop(): async pull exceeded timeout of %v for image %s", prefix, ses.timeout, ses.ImageWithTag())
klog.V(2).Infof(ses.err.Error())
metrics.OperationErrorsCount.WithLabelValues("pull-async-timeout").Inc()
default: // completion: success or error
ses.isTimedOut = false
ses.err = pullErr
klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.Image(), ses.err)
klog.V(2).Infof("%s.RunPullerLoop(): pull completed in %v for image %s with error=%v\n", prefix, time.Since(pullStart), ses.ImageWithTag(), ses.err)
if ses.err != nil {
metrics.OperationErrorsCount.WithLabelValues("pull-async-error").Inc()
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/remoteimageasync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func StartAsyncPuller(ctx context.Context, sessionChanDepth int) AsyncPuller {
completedFunc := func(ses *PullSession) { // remove session from session map (since no longer active for continuation)
async.mutex.Lock()
defer async.mutex.Unlock()
klog.V(2).Infof("%s.StartAsyncPuller(): clearing session for %s", prefix, ses.Image())
delete(async.sessionMap, ses.Image()) // no-op if already deleted
klog.V(2).Infof("%s.StartAsyncPuller(): clearing session for %s", prefix, ses.ImageWithTag())
delete(async.sessionMap, ses.ImageWithTag()) // no-op if already deleted
}
RunPullerLoop(ctx, sessionChan, completedFunc)
klog.Infof("%s.StartAsyncPuller(): async puller is operational", prefix)
Expand Down Expand Up @@ -63,37 +63,37 @@ func (s synchronizer) StartPull(image string, puller remoteimage.Puller, asyncPu
if rec := recover(); rec != nil { // handle session write panic due to closed sessionChan
// override named return values
ses = nil
err = fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, reason: %v", prefix, ses.Image(), rec)
err = fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, reason: %v", prefix, ses.ImageWithTag(), rec)
klog.V(2).Info(err.Error())
}
}()
select {
case s.sessions <- ses: // start session, check for deadlock... possibility of panic but only during app shutdown where Puller has already ceased to operate, handle with defer/recover
klog.V(2).Infof("%s.StartPull(): new session created for %s with timeout %v", prefix, ses.Image(), ses.timeout)
klog.V(2).Infof("%s.StartPull(): new session created for %s with timeout %v", prefix, ses.ImageWithTag(), ses.timeout)
s.sessionMap[image] = ses // add session to map to allow continuation... only do this because was passed to puller via sessions channel
return ses, nil
default: // catch deadlock or throttling (they may look the same)
err := fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.Image())
err := fmt.Errorf("%s.StartPull(): cannot create pull session for %s at this time, throttling or deadlock condition exists, retry if throttling", prefix, ses.ImageWithTag())
klog.V(2).Info(err.Error())
return nil, err
}
} else {
klog.V(2).Infof("%s.StartPull(): found open session for %s", prefix, ses.Image())
klog.V(2).Infof("%s.StartPull(): found open session for %s", prefix, ses.ImageWithTag())
// return session and unlock
return ses, nil
}
}

func (s synchronizer) WaitForPull(session *PullSession, callerTimeout context.Context) error {
klog.V(2).Infof("%s.WaitForPull(): starting to wait for image %s", prefix, session.Image())
defer klog.V(2).Infof("%s.WaitForPull(): exiting wait for image %s", prefix, session.Image())
klog.V(2).Infof("%s.WaitForPull(): starting to wait for image %s", prefix, session.ImageWithTag())
defer klog.V(2).Infof("%s.WaitForPull(): exiting wait for image %s", prefix, session.ImageWithTag())
select {
case <-session.done: // success or error (including session timeout and shutting down)
klog.V(2).Infof("%s.WaitForPull(): session completed with success or error for image %s, error=%v", prefix, session.Image(), session.err)
klog.V(2).Infof("%s.WaitForPull(): session completed with success or error for image %s, error=%v", prefix, session.ImageWithTag(), session.err)
return session.err
case <-callerTimeout.Done(): // caller timeout
err := fmt.Errorf("%s.WaitForPull(): this wait for image %s has timed out due to caller context cancellation, pull likely continues in the background",
prefix, session.Image())
prefix, session.ImageWithTag())
klog.V(2).Info(err.Error())
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/remoteimageasync/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,14 @@ func (p pullerMock) Pull(ctx context.Context) (err error) {
}
}

func (p pullerMock) Image() string {
func (p pullerMock) ImageWithTag() string {
return p.image
}

func (p pullerMock) ImageWithoutTag() string {
panic("Not implemented")
}

func (p pullerMock) ImageSize(ctx context.Context) (int, error) {
if p.size < 0 {
return 0, fmt.Errorf("error occurred when checking image size")
Expand Down
4 changes: 2 additions & 2 deletions pkg/remoteimageasync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type PullSession struct {
err error
}

func (p PullSession) Image() string {
return p.puller.Image()
func (p PullSession) ImageWithTag() string {
return p.puller.ImageWithTag()
}

type synchronizer struct {
Expand Down

0 comments on commit f3cd52d

Please sign in to comment.