Skip to content

Commit

Permalink
feat: first implementation of the tokens logic
Browse files Browse the repository at this point in the history
  • Loading branch information
vadasambar committed Dec 11, 2023
1 parent a5684d1 commit 64264f2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
4 changes: 4 additions & 0 deletions cmd/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func main() {
klog.Fatalf("The mode of the driver is required.")
}

if !*asyncImagePullMount && *maxInflightPulls > 0 {
klog.Fatalf("--max-in-flight-pulls (current value: '%v') can only be used with --async-pull-mount=true (current value: %v)", *maxInflightPulls, *asyncImagePullMount)
}

server := csicommon.NewNonBlockingGRPCServer()

switch *mode {
Expand Down
39 changes: 26 additions & 13 deletions pkg/pullexecutor/pullexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,31 @@ type PullOptions struct {

// PullExecutor executes the pulls
type PullExecutor struct {
asyncPull bool
imageSvcClient cri.ImageServiceClient
mutex *sync.Mutex
asyncErrs map[docker.Named]error
secretStore secret.Store
mounter *backend.SnapshotMounter
maxInflightPulls int
asyncPull bool
imageSvcClient cri.ImageServiceClient
mutex *sync.Mutex
asyncErrs map[docker.Named]error
secretStore secret.Store
mounter *backend.SnapshotMounter
tokens chan struct{}
}

// NewPullExecutor initializes a new pull executor object
func NewPullExecutor(o *PullExecutorOptions) *PullExecutor {

var tokens chan struct{}

if o.MaxInflightPulls > 0 {
tokens = make(chan struct{}, o.MaxInflightPulls)
}

return &PullExecutor{
asyncPull: o.AsyncPull,
mutex: &sync.Mutex{},
imageSvcClient: o.ImageServiceClient,
secretStore: o.SecretStore,
mounter: o.Mounter,
maxInflightPulls: o.MaxInflightPulls,
asyncPull: o.AsyncPull,
mutex: &sync.Mutex{},
imageSvcClient: o.ImageServiceClient,
secretStore: o.SecretStore,
mounter: o.Mounter,
tokens: tokens,
}
}

Expand Down Expand Up @@ -107,6 +114,12 @@ func (m *PullExecutor) StartPulling(o *PullOptions) error {
puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring)
shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef)
if shouldPull {
if m.tokens != nil {
m.tokens <- struct{}{}
defer func() {
<-m.tokens
}()
}
klog.Infof("pull image %q ", o.Image)
pullstatus.Update(o.NamedRef, pullstatus.StillPulling)
if err = puller.Pull(c); err != nil {
Expand Down

0 comments on commit 64264f2

Please sign in to comment.