Skip to content

Commit f62d8c9

Browse files
committed
feat: async image pull and mount
feat: wip add tests to reproduce `context deadline exceeded` problem test: add test to call `NodePublishVolume` constantly for 2 minutes until image is downloaded - this should fail for the current code feat: wip add logic for async pull - not working right now fix: async image pull timing out - mount is still a bottleneck feat: wip working async mount - fails with race conditions infrequently (<- looking at this) feat: fix concurrency errors - abstract mounting and pulling logic to separate packages - add readme for running tests refactor: remove redundant `uuid` related code fix: CI lint/fmt errors - use `defer cancel()` instead of discarding the fn - fix log fix: update mount status on finishing sync mount chore: remove extra whitespace in the log refactor: use localized error - to address PR comment and avoid confusion refactor: define magic numbers as constants refactor: define mount and pull ctx timeout as constants refactor: rename `--async-image-pulls` -> `--async-pull-mount` fix: ci failing because of image pull error fix: mounter is nil fix: image not mounting if the image already exists on the node fix: mounting the same volume more than once doesn't work refactor: add log if image hasn't been pulled or mount is still in progress refactor: make `NewNodeServer` call more compact refactor: defer unlocking mutex instead of explicitly calling unlock at multiple places refactor: remove leftover `ctx` in print statement chore: bump version in Makefile and Chart.yaml
1 parent 40166cf commit f62d8c9

File tree

17 files changed

+922
-32
lines changed

17 files changed

+922
-32
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/_output
22
/.idea
33
id_rsa*
4-
.cache
4+
.cache
5+
vendor

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
VERSION ?= v0.7.0
1+
VERSION ?= v0.8.0
22

33
IMAGE_BUILDER ?= docker
44
IMAGE_BUILD_CMD ?= buildx

charts/warm-metal-csi-driver/Chart.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,17 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18+
<<<<<<< HEAD
1819
version: 0.7.0
20+
=======
21+
version: 0.8.0
22+
>>>>>>> cbe65e4 (feat: async image pull and mount)
1923

2024
# This is the version number of the application being deployed. This version number should be
2125
# incremented each time you make changes to the application. Versions are not expected to
2226
# follow Semantic Versioning. They should reflect the version the application is using.
27+
<<<<<<< HEAD
2328
appVersion: v0.7.0
29+
=======
30+
appVersion: v0.8.0
31+
>>>>>>> cbe65e4 (feat: async image pull and mount)

cmd/plugin/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ var (
5151
enableCache = flag.Bool("enable-daemon-image-credential-cache", true,
5252
"Whether to save contents of imagepullsecrets of the daemon ServiceAccount in memory. "+
5353
"If set to false, secrets will be fetched from the API server on every image pull.")
54+
asyncImagePullMount = flag.Bool("async-pull-mount", false,
55+
"Whether to pull images asynchronously (helps prevent timeout for larger images)")
5456
watcherResyncPeriod = flag.Duration("watcher-resync-period", 30*time.Minute, "The resync period of the pvc watcher.")
5557
mode = flag.String("mode", "", "The mode of the driver. Valid values are: node, controller")
5658
nodePluginSA = flag.String("node-plugin-sa", "csi-image-warm-metal", "The name of the ServiceAccount used by the node plugin.")
@@ -122,10 +124,12 @@ func main() {
122124
klog.Fatalf(`unable to connect to cri daemon "%s": %s`, *endpoint, err)
123125
}
124126

127+
secretStore := secret.CreateStoreOrDie(*icpConf, *icpBin, *nodePluginSA, *enableCache)
128+
125129
server.Start(*endpoint,
126130
NewIdentityServer(driverVersion),
127131
nil,
128-
NewNodeServer(driver, mounter, criClient, secret.CreateStoreOrDie(*icpConf, *icpBin, *nodePluginSA, *enableCache)))
132+
NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullMount))
129133
case controllerMode:
130134
watcher, err := watcher.New(context.Background(), *watcherResyncPeriod)
131135
if err != nil {

cmd/plugin/node_server.go

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"github.com/container-storage-interface/spec/lib/go/csi"
99
"github.com/containerd/containerd/reference/docker"
1010
"github.com/warm-metal/csi-driver-image/pkg/backend"
11-
"github.com/warm-metal/csi-driver-image/pkg/remoteimage"
11+
"github.com/warm-metal/csi-driver-image/pkg/mountexecutor"
12+
"github.com/warm-metal/csi-driver-image/pkg/mountstatus"
13+
"github.com/warm-metal/csi-driver-image/pkg/pullexecutor"
1214
"github.com/warm-metal/csi-driver-image/pkg/secret"
1315
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
1416
"google.golang.org/grpc/codes"
@@ -25,20 +27,34 @@ const (
2527
ctxKeyEphemeralVolume = "csi.storage.k8s.io/ephemeral"
2628
)
2729

28-
func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store) *NodeServer {
30+
type ImagePullStatus int
31+
32+
func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer {
2933
return &NodeServer{
30-
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver),
31-
mounter: mounter,
32-
imageSvc: imageSvc,
33-
secretStore: secretStore,
34+
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver),
35+
mounter: mounter,
36+
secretStore: secretStore,
37+
asyncImagePullMount: asyncImagePullMount,
38+
mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{
39+
AsyncMount: asyncImagePullMount,
40+
Mounter: mounter,
41+
}),
42+
pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{
43+
AsyncPull: asyncImagePullMount,
44+
ImageServiceClient: imageSvc,
45+
SecretStore: secretStore,
46+
Mounter: mounter,
47+
}),
3448
}
3549
}
3650

3751
type NodeServer struct {
3852
*csicommon.DefaultNodeServer
39-
mounter *backend.SnapshotMounter
40-
imageSvc cri.ImageServiceClient
41-
secretStore secret.Store
53+
mounter *backend.SnapshotMounter
54+
secretStore secret.Store
55+
asyncImagePullMount bool
56+
mountExecutor *mountexecutor.MountExecutor
57+
pullExecutor *pullexecutor.PullExecutor
4258
}
4359

4460
func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) {
@@ -103,37 +119,55 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV
103119
image = req.VolumeContext[ctxKeyImage]
104120
}
105121

122+
namedRef, err := docker.ParseDockerRef(image)
123+
if err != nil {
124+
klog.Errorf("unable to normalize image %q: %s", image, err)
125+
return
126+
}
127+
106128
pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true"
107129

108-
keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets)
109-
if err != nil {
110-
err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err)
130+
po := &pullexecutor.PullOptions{
131+
Context: ctx,
132+
NamedRef: namedRef,
133+
PullAlways: pullAlways,
134+
Image: image,
135+
PullSecrets: req.Secrets,
136+
}
137+
138+
if e := n.pullExecutor.StartPulling(po); e != nil {
139+
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e)
111140
return
112141
}
113142

114-
namedRef, err := docker.ParseDockerRef(image)
115-
if err != nil {
116-
klog.Errorf("unable to normalize image %q: %s", image, err)
143+
if e := n.pullExecutor.WaitForPull(po); e != nil {
144+
err = status.Errorf(codes.DeadlineExceeded, err.Error())
117145
return
118146
}
119147

120-
puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring)
121-
if pullAlways || !n.mounter.ImageExists(ctx, namedRef) {
122-
klog.Errorf("pull image %q", image)
123-
if err = puller.Pull(ctx); err != nil {
124-
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err)
125-
return
126-
}
148+
if mountstatus.Get(req.VolumeId) == mountstatus.Mounted {
149+
return &csi.NodePublishVolumeResponse{}, nil
127150
}
128151

129-
ro := req.Readonly ||
130-
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY ||
131-
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
132-
if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil {
152+
o := &mountexecutor.MountOptions{
153+
Context: ctx,
154+
NamedRef: namedRef,
155+
VolumeId: req.VolumeId,
156+
TargetPath: req.TargetPath,
157+
VolumeCapability: req.VolumeCapability,
158+
ReadOnly: req.Readonly,
159+
}
160+
161+
if err = n.mountExecutor.StartMounting(o); err != nil {
133162
err = status.Error(codes.Internal, err.Error())
134163
return
135164
}
136165

166+
if e := n.mountExecutor.WaitForMount(o); e != nil {
167+
err = status.Errorf(codes.DeadlineExceeded, err.Error())
168+
return
169+
}
170+
137171
return &csi.NodePublishVolumeResponse{}, nil
138172
}
139173

@@ -159,6 +193,11 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl
159193
return
160194
}
161195

196+
// Clear the mountstatus since the volume has been unmounted
197+
// Not doing this will make mount not work properly if the same volume is
198+
// attempted to mount twice
199+
mountstatus.Delete(req.VolumeId)
200+
162201
return &csi.NodeUnpublishVolumeResponse{}, nil
163202
}
164203

0 commit comments

Comments
 (0)