Skip to content

Commit

Permalink
feat: async image pull and mount
Browse files Browse the repository at this point in the history
feat: wip add tests to reproduce `context deadline exceeded` problem

test: add test to call `NodePublishVolume` constantly for 2 minutes until image is downloaded
- this should fail for the current code

feat: wip add logic for async pull
- not working right now

fix: async image pull timing out
- mount is still a bottleneck

feat: wip working async mount
- fails with race conditions infrequently (<- looking at this)

feat: fix concurrency errors
- abstract mounting and pulling logic to separate packages
- add readme for running tests

refactor: remove redundant `uuid` related code

fix: CI lint/fmt errors
- use `defer cancel()` instead of discarding the fn
- fix log

fix: update mount status on finishing sync mount

chore: remove extra whitespace in the log

refactor: use localized error
- to address PR comment and avoid confusion

refactor: define magic numbers as constants

refactor: define mount and pull ctx timeout as constants

refactor: rename `--async-image-pulls` -> `--async-pull-mount`

fix: ci failing because of image pull error

fix: mounter is nil

fix: image not mounting if the image already exists on the node

fix: mounting the same volume more than once doesn't work

refactor: add log if image hasn't been pulled or mount is still in progress

refactor: make `NewNodeServer` call more compact

refactor: defer unlocking mutex instead of explicitly calling unlock at multiple places

refactor: remove leftover `ctx` in print statement

chore: bump version in Makefile and Chart.yaml

chore: revert bump to `v0.8.0`
- causes CI to fail
  • Loading branch information
vadasambar committed Nov 30, 2023
1 parent 40166cf commit 00e9632
Show file tree
Hide file tree
Showing 16 changed files with 921 additions and 31 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/_output
/.idea
id_rsa*
.cache
.cache
vendor
8 changes: 8 additions & 0 deletions charts/warm-metal-csi-driver/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
<<<<<<< HEAD
version: 0.7.0
=======
version: 0.8.0
>>>>>>> cbe65e4 (feat: async image pull and mount)

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
<<<<<<< HEAD
appVersion: v0.7.0
=======
appVersion: v0.8.0
>>>>>>> cbe65e4 (feat: async image pull and mount)
6 changes: 5 additions & 1 deletion cmd/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
enableCache = flag.Bool("enable-daemon-image-credential-cache", true,
"Whether to save contents of imagepullsecrets of the daemon ServiceAccount in memory. "+
"If set to false, secrets will be fetched from the API server on every image pull.")
asyncImagePullMount = flag.Bool("async-pull-mount", false,
"Whether to pull images asynchronously (helps prevent timeout for larger images)")
watcherResyncPeriod = flag.Duration("watcher-resync-period", 30*time.Minute, "The resync period of the pvc watcher.")
mode = flag.String("mode", "", "The mode of the driver. Valid values are: node, controller")
nodePluginSA = flag.String("node-plugin-sa", "csi-image-warm-metal", "The name of the ServiceAccount used by the node plugin.")
Expand Down Expand Up @@ -122,10 +124,12 @@ func main() {
klog.Fatalf(`unable to connect to cri daemon "%s": %s`, *endpoint, err)
}

secretStore := secret.CreateStoreOrDie(*icpConf, *icpBin, *nodePluginSA, *enableCache)

server.Start(*endpoint,
NewIdentityServer(driverVersion),
nil,
NewNodeServer(driver, mounter, criClient, secret.CreateStoreOrDie(*icpConf, *icpBin, *nodePluginSA, *enableCache)))
NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullMount))
case controllerMode:
watcher, err := watcher.New(context.Background(), *watcherResyncPeriod)
if err != nil {
Expand Down
91 changes: 65 additions & 26 deletions cmd/plugin/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/containerd/containerd/reference/docker"
"github.com/warm-metal/csi-driver-image/pkg/backend"
"github.com/warm-metal/csi-driver-image/pkg/remoteimage"
"github.com/warm-metal/csi-driver-image/pkg/mountexecutor"
"github.com/warm-metal/csi-driver-image/pkg/mountstatus"
"github.com/warm-metal/csi-driver-image/pkg/pullexecutor"
"github.com/warm-metal/csi-driver-image/pkg/secret"
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
"google.golang.org/grpc/codes"
Expand All @@ -25,20 +27,34 @@ const (
ctxKeyEphemeralVolume = "csi.storage.k8s.io/ephemeral"
)

func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store) *NodeServer {
type ImagePullStatus int

func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer {
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver),
mounter: mounter,
imageSvc: imageSvc,
secretStore: secretStore,
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver),
mounter: mounter,
secretStore: secretStore,
asyncImagePullMount: asyncImagePullMount,
mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{
AsyncMount: asyncImagePullMount,
Mounter: mounter,
}),
pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{
AsyncPull: asyncImagePullMount,
ImageServiceClient: imageSvc,
SecretStore: secretStore,
Mounter: mounter,
}),
}
}

type NodeServer struct {
*csicommon.DefaultNodeServer
mounter *backend.SnapshotMounter
imageSvc cri.ImageServiceClient
secretStore secret.Store
mounter *backend.SnapshotMounter
secretStore secret.Store
asyncImagePullMount bool
mountExecutor *mountexecutor.MountExecutor
pullExecutor *pullexecutor.PullExecutor
}

func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) {
Expand Down Expand Up @@ -103,37 +119,55 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV
image = req.VolumeContext[ctxKeyImage]
}

namedRef, err := docker.ParseDockerRef(image)
if err != nil {
klog.Errorf("unable to normalize image %q: %s", image, err)
return
}

pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true"

keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets)
if err != nil {
err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err)
po := &pullexecutor.PullOptions{
Context: ctx,
NamedRef: namedRef,
PullAlways: pullAlways,
Image: image,
PullSecrets: req.Secrets,
}

if e := n.pullExecutor.StartPulling(po); e != nil {
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e)
return
}

namedRef, err := docker.ParseDockerRef(image)
if err != nil {
klog.Errorf("unable to normalize image %q: %s", image, err)
if e := n.pullExecutor.WaitForPull(po); e != nil {
err = status.Errorf(codes.DeadlineExceeded, err.Error())
return
}

puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring)
if pullAlways || !n.mounter.ImageExists(ctx, namedRef) {
klog.Errorf("pull image %q", image)
if err = puller.Pull(ctx); err != nil {
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err)
return
}
if mountstatus.Get(req.VolumeId) == mountstatus.Mounted {
return &csi.NodePublishVolumeResponse{}, nil
}

ro := req.Readonly ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil {
o := &mountexecutor.MountOptions{
Context: ctx,
NamedRef: namedRef,
VolumeId: req.VolumeId,
TargetPath: req.TargetPath,
VolumeCapability: req.VolumeCapability,
ReadOnly: req.Readonly,
}

if err = n.mountExecutor.StartMounting(o); err != nil {
err = status.Error(codes.Internal, err.Error())
return
}

if e := n.mountExecutor.WaitForMount(o); e != nil {
err = status.Errorf(codes.DeadlineExceeded, err.Error())
return
}

return &csi.NodePublishVolumeResponse{}, nil
}

Expand All @@ -159,6 +193,11 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl
return
}

// Clear the mountstatus since the volume has been unmounted
// Not doing this will make mount not work properly if the same volume is
// attempted to mount twice
mountstatus.Delete(req.VolumeId)

return &csi.NodeUnpublishVolumeResponse{}, nil
}

Expand Down
Loading

0 comments on commit 00e9632

Please sign in to comment.