From 7189e2ad7efb15804f30048fceadb6e3f462a8d8 Mon Sep 17 00:00:00 2001 From: vadasambar Date: Tue, 31 Oct 2023 20:59:10 +0530 Subject: [PATCH] feat: async image pull and mount feat: wip add tests to reproduce `context deadline exceeded` problem test: add test to call `NodePublishVolume` constantly for 2 minutes until image is downloaded - this should fail for the current code feat: wip add logic for async pull - not working right now fix: async image pull timing out - mount is still a bottleneck feat: wip working async mount - fails with race conditions infrequently (<- looking at this) feat: fix concurrency errors - abstract mounting and pulling logic to separate packages - add readme for running tests refactor: remove redundant `uuid` related code fix: CI lint/fmt errors - use `defer cancel()` instead of discarding the fn - fix log fix: update mount status on finishing sync mount chore: remove extra whitespace in the log refactor: use localized error - to address PR comment and avoid confusion refactor: define magic numbers as constants refactor: define mount and pull ctx timeout as constants refactor: rename `--async-image-pulls` -> `--async-pull-mount` fix: ci failing because of image pull error fix: mounter is nil fix: image not mounting if the image already exists on the node fix: mounting the same volume more than once doesn't work refactor: add log if image hasn't been pulled or mount is still in progress refactor: make `NewNodeServer` call more compact refactor: defer unlocking mutex instead of explicitly calling unlock at multiple places refactor: remove leftover `ctx` in print statement chore: bump version in Makefile and Chart.yaml chore: revert bump to `v0.8.0` - causes CI to fail fix: remove conflict markers in Chart.yaml - was causing CI to fail chore: add a space after Chart.yaml - to remove newline diff in the PR --- .gitignore | 3 +- cmd/plugin/main.go | 6 +- cmd/plugin/node_server.go | 91 ++++-- cmd/plugin/node_server_test.go | 281 ++++++++++++++++++ go.mod | 2 + pkg/backend/mounter.go | 3 +- pkg/mountexecutor/mountexecutor.go | 129 ++++++++ pkg/mountstatus/mountstatus.go | 63 ++++ pkg/pullexecutor/pullexecutor.go | 148 +++++++++ pkg/pullstatus/pullstatus.go | 65 ++++ pkg/remoteimage/pull.go | 3 +- pkg/remoteimage/pull_test.go | 30 ++ .../node-server/Dockerfile.containerd | 28 ++ test/integration/node-server/README.md | 83 ++++++ .../node-server/docker-compose.yaml | 9 + 15 files changed, 913 insertions(+), 31 deletions(-) create mode 100644 cmd/plugin/node_server_test.go create mode 100644 pkg/mountexecutor/mountexecutor.go create mode 100644 pkg/mountstatus/mountstatus.go create mode 100644 pkg/pullexecutor/pullexecutor.go create mode 100644 pkg/pullstatus/pullstatus.go create mode 100644 pkg/remoteimage/pull_test.go create mode 100644 test/integration/node-server/Dockerfile.containerd create mode 100644 test/integration/node-server/README.md create mode 100644 test/integration/node-server/docker-compose.yaml diff --git a/.gitignore b/.gitignore index 0e57c20..517b064 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /_output /.idea id_rsa* -.cache \ No newline at end of file +.cache +vendor \ No newline at end of file diff --git a/cmd/plugin/main.go b/cmd/plugin/main.go index a7fbfcf..550b889 100644 --- a/cmd/plugin/main.go +++ b/cmd/plugin/main.go @@ -51,6 +51,8 @@ var ( enableCache = flag.Bool("enable-daemon-image-credential-cache", true, "Whether to save contents of imagepullsecrets of the daemon ServiceAccount in memory. "+ "If set to false, secrets will be fetched from the API server on every image pull.") + asyncImagePullMount = flag.Bool("async-pull-mount", false, + "Whether to pull images asynchronously (helps prevent timeout for larger images)") watcherResyncPeriod = flag.Duration("watcher-resync-period", 30*time.Minute, "The resync period of the pvc watcher.") mode = flag.String("mode", "", "The mode of the driver. Valid values are: node, controller") nodePluginSA = flag.String("node-plugin-sa", "csi-image-warm-metal", "The name of the ServiceAccount used by the node plugin.") @@ -122,10 +124,12 @@ func main() { klog.Fatalf(`unable to connect to cri daemon "%s": %s`, *endpoint, err) } + secretStore := secret.CreateStoreOrDie(*icpConf, *icpBin, *nodePluginSA, *enableCache) + server.Start(*endpoint, NewIdentityServer(driverVersion), nil, - NewNodeServer(driver, mounter, criClient, secret.CreateStoreOrDie(*icpConf, *icpBin, *nodePluginSA, *enableCache))) + NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullMount)) case controllerMode: watcher, err := watcher.New(context.Background(), *watcherResyncPeriod) if err != nil { diff --git a/cmd/plugin/node_server.go b/cmd/plugin/node_server.go index 7da1400..108de79 100644 --- a/cmd/plugin/node_server.go +++ b/cmd/plugin/node_server.go @@ -8,7 +8,9 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/containerd/containerd/reference/docker" "github.com/warm-metal/csi-driver-image/pkg/backend" - "github.com/warm-metal/csi-driver-image/pkg/remoteimage" + "github.com/warm-metal/csi-driver-image/pkg/mountexecutor" + "github.com/warm-metal/csi-driver-image/pkg/mountstatus" + "github.com/warm-metal/csi-driver-image/pkg/pullexecutor" "github.com/warm-metal/csi-driver-image/pkg/secret" csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" "google.golang.org/grpc/codes" @@ -25,20 +27,34 @@ const ( ctxKeyEphemeralVolume = "csi.storage.k8s.io/ephemeral" ) -func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store) *NodeServer { +type ImagePullStatus int + +func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer { return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), - mounter: mounter, - imageSvc: imageSvc, - secretStore: secretStore, + DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), + mounter: mounter, + secretStore: secretStore, + asyncImagePullMount: asyncImagePullMount, + mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{ + AsyncMount: asyncImagePullMount, + Mounter: mounter, + }), + pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{ + AsyncPull: asyncImagePullMount, + ImageServiceClient: imageSvc, + SecretStore: secretStore, + Mounter: mounter, + }), } } type NodeServer struct { *csicommon.DefaultNodeServer - mounter *backend.SnapshotMounter - imageSvc cri.ImageServiceClient - secretStore secret.Store + mounter *backend.SnapshotMounter + secretStore secret.Store + asyncImagePullMount bool + mountExecutor *mountexecutor.MountExecutor + pullExecutor *pullexecutor.PullExecutor } func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) { @@ -103,37 +119,55 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV image = req.VolumeContext[ctxKeyImage] } + namedRef, err := docker.ParseDockerRef(image) + if err != nil { + klog.Errorf("unable to normalize image %q: %s", image, err) + return + } + pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true" - keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets) - if err != nil { - err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err) + po := &pullexecutor.PullOptions{ + Context: ctx, + NamedRef: namedRef, + PullAlways: pullAlways, + Image: image, + PullSecrets: req.Secrets, + } + + if e := n.pullExecutor.StartPulling(po); e != nil { + err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e) return } - namedRef, err := docker.ParseDockerRef(image) - if err != nil { - klog.Errorf("unable to normalize image %q: %s", image, err) + if e := n.pullExecutor.WaitForPull(po); e != nil { + err = status.Errorf(codes.DeadlineExceeded, err.Error()) return } - puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring) - if pullAlways || !n.mounter.ImageExists(ctx, namedRef) { - klog.Errorf("pull image %q", image) - if err = puller.Pull(ctx); err != nil { - err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) - return - } + if mountstatus.Get(req.VolumeId) == mountstatus.Mounted { + return &csi.NodePublishVolumeResponse{}, nil } - ro := req.Readonly || - req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || - req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY - if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil { + o := &mountexecutor.MountOptions{ + Context: ctx, + NamedRef: namedRef, + VolumeId: req.VolumeId, + TargetPath: req.TargetPath, + VolumeCapability: req.VolumeCapability, + ReadOnly: req.Readonly, + } + + if err = n.mountExecutor.StartMounting(o); err != nil { err = status.Error(codes.Internal, err.Error()) return } + if e := n.mountExecutor.WaitForMount(o); e != nil { + err = status.Errorf(codes.DeadlineExceeded, err.Error()) + return + } + return &csi.NodePublishVolumeResponse{}, nil } @@ -159,6 +193,11 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl return } + // Clear the mountstatus since the volume has been unmounted + // Not doing this will make mount not work properly if the same volume is + // attempted to mount twice + mountstatus.Delete(req.VolumeId) + return &csi.NodeUnpublishVolumeResponse{}, nil } diff --git a/cmd/plugin/node_server_test.go b/cmd/plugin/node_server_test.go new file mode 100644 index 0000000..12ae09d --- /dev/null +++ b/cmd/plugin/node_server_test.go @@ -0,0 +1,281 @@ +package main + +import ( + "context" + "fmt" + "net" + "net/url" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/stretchr/testify/assert" + "github.com/warm-metal/csi-driver-image/pkg/backend" + "github.com/warm-metal/csi-driver-image/pkg/backend/containerd" + "github.com/warm-metal/csi-driver-image/pkg/cri" + csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" + "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/credentialprovider" +) + +// Check test/integration/node-server/README.md for how to run this test correctly +func TestNodePublishVolumeAsync(t *testing.T) { + socketAddr := "unix:///run/containerd/containerd.sock" + addr, err := url.Parse(socketAddr) + assert.NoError(t, err) + + criClient, err := cri.NewRemoteImageService(socketAddr, time.Minute) + assert.NoError(t, err) + assert.NotNil(t, criClient) + + mounter := containerd.NewMounter(addr.Path) + assert.NotNil(t, mounter) + + driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node") + assert.NotNil(t, driver) + + asyncImagePulls := true + ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls) + + // based on kubelet's csi mounter pluginc ode + // check https://github.com/kubernetes/kubernetes/blob/b06a31b87235784bad2858be62115049b6eb6bcd/pkg/volume/csi/csi_mounter.go#L111-L112 + timeout := 100 * time.Millisecond + + volId := "docker.io/library/redis:latest" + target := "test-path" + req := &csi.NodePublishVolumeRequest{ + VolumeId: volId, + TargetPath: target, + VolumeContext: map[string]string{ + // so that the test would always attempt to pull an image + ctxKeyPullAlways: "true", + }, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, + }, + }, + } + + server := csicommon.NewNonBlockingGRPCServer() + + addr, err = url.Parse(*endpoint) + assert.NoError(t, err) + + os.Remove("/csi/csi.sock") + + // automatically deleted when the server is stopped + f, err := os.Create("/csi/csi.sock") + assert.NoError(t, err) + assert.NotNil(t, f) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + + server.Start(*endpoint, + nil, + nil, + ns) + // wait for the GRPC server to start + wg.Done() + server.Wait() + }() + + // give some time for server to start + time.Sleep(2 * time.Second) + defer func() { + klog.Info("server was stopped") + server.Stop() + }() + + wg.Wait() + var conn *grpc.ClientConn + + conn, err = grpc.Dial( + addr.Path, + grpc.WithInsecure(), + grpc.WithContextDialer(func(ctx context.Context, targetPath string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", targetPath) + }), + ) + + if err != nil { + panic(err) + } + + assert.NoError(t, err) + assert.NotNil(t, conn) + + nodeClient := csipbv1.NewNodeClient(conn) + assert.NotNil(t, nodeClient) + + condFn := func() (done bool, err error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + resp, err := nodeClient.NodePublishVolume(ctx, req) + if err != nil && strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + klog.Errorf("context deadline exceeded; retrying: %v", err) + return false, nil + } + if resp != nil { + return true, nil + } + return false, fmt.Errorf("response from `NodePublishVolume` is nil") + } + + err = wait.PollImmediate( + timeout, + 30*time.Second, + condFn) + assert.NoError(t, err) + + // give some time before stopping the server + time.Sleep(5 * time.Second) + + // unmount if the volume is already mounted + c, ca := context.WithTimeout(context.Background(), time.Second*10) + defer ca() + + err = mounter.Unmount(c, volId, backend.MountTarget(target)) + assert.NoError(t, err) +} + +// Check test/integration/node-server/README.md for how to run this test correctly +func TestNodePublishVolumeSync(t *testing.T) { + socketAddr := "unix:///run/containerd/containerd.sock" + addr, err := url.Parse(socketAddr) + assert.NoError(t, err) + + criClient, err := cri.NewRemoteImageService(socketAddr, time.Minute) + assert.NoError(t, err) + assert.NotNil(t, criClient) + + mounter := containerd.NewMounter(addr.Path) + assert.NotNil(t, mounter) + + driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node") + assert.NotNil(t, driver) + + asyncImagePulls := false + ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls) + + // based on kubelet's csi mounter pluginc ode + // check https://github.com/kubernetes/kubernetes/blob/b06a31b87235784bad2858be62115049b6eb6bcd/pkg/volume/csi/csi_mounter.go#L111-L112 + timeout := 100 * time.Millisecond + + volId := "docker.io/library/redis:latest" + target := "test-path" + req := &csi.NodePublishVolumeRequest{ + VolumeId: volId, + TargetPath: target, + VolumeContext: map[string]string{ + // so that the test would always attempt to pull an image + ctxKeyPullAlways: "true", + }, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY, + }, + }, + } + + server := csicommon.NewNonBlockingGRPCServer() + + addr, err = url.Parse(*endpoint) + assert.NoError(t, err) + + os.Remove("/csi/csi.sock") + + // automatically deleted when the server is stopped + f, err := os.Create("/csi/csi.sock") + assert.NoError(t, err) + assert.NotNil(t, f) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + + server.Start(*endpoint, + nil, + nil, + ns) + // wait for the GRPC server to start + wg.Done() + server.Wait() + }() + + // give some time for server to start + time.Sleep(2 * time.Second) + defer func() { + klog.Info("server was stopped") + server.Stop() + }() + + wg.Wait() + var conn *grpc.ClientConn + + conn, err = grpc.Dial( + addr.Path, + grpc.WithInsecure(), + grpc.WithContextDialer(func(ctx context.Context, targetPath string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", targetPath) + }), + ) + + if err != nil { + panic(err) + } + + assert.NoError(t, err) + assert.NotNil(t, conn) + + nodeClient := csipbv1.NewNodeClient(conn) + assert.NotNil(t, nodeClient) + + condFn := func() (done bool, err error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + resp, err := nodeClient.NodePublishVolume(ctx, req) + if err != nil && strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + klog.Errorf("context deadline exceeded; retrying: %v", err) + return false, nil + } + if resp != nil { + return true, nil + } + return false, fmt.Errorf("response from `NodePublishVolume` is nil") + } + + err = wait.PollImmediate( + timeout, + 30*time.Second, + condFn) + + assert.Error(t, err) + assert.ErrorContains(t, err, "timed out waiting for the condition") + + // give some time before stopping the server + time.Sleep(5 * time.Second) + + // unmount if the volume is already mounted + c, ca := context.WithTimeout(context.Background(), time.Second*10) + defer ca() + + err = mounter.Unmount(c, volId, backend.MountTarget(target)) + assert.Error(t, err) + assert.ErrorContains(t, err, "not found") +} + +type testSecretStore struct { +} + +func (t *testSecretStore) GetDockerKeyring(ctx context.Context, secrets map[string]string) (credentialprovider.DockerKeyring, error) { + return credentialprovider.UnionDockerKeyring{credentialprovider.NewDockerKeyring()}, nil +} diff --git a/go.mod b/go.mod index 1e4b0fa..3b5fb75 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/opencontainers/image-spec v1.1.0-rc2 github.com/pkg/errors v0.9.1 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.8.0 github.com/warm-metal/csi-drivers v0.5.0-alpha.0.0.20210404173852-9ec9cb097dd2 golang.org/x/net v0.0.0-20221004154528-8021a29435af google.golang.org/grpc v1.50.0 @@ -91,6 +92,7 @@ require ( github.com/opencontainers/runc v1.1.4 // indirect github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect github.com/opencontainers/selinux v1.10.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect diff --git a/pkg/backend/mounter.go b/pkg/backend/mounter.go index 1cc40ba..8871758 100644 --- a/pkg/backend/mounter.go +++ b/pkg/backend/mounter.go @@ -164,8 +164,7 @@ func (s *SnapshotMounter) unrefROSnapshot(ctx context.Context, target MountTarge } func (s *SnapshotMounter) Mount( - ctx context.Context, volumeId string, target MountTarget, image docker.Named, ro bool, -) (err error) { + ctx context.Context, volumeId string, target MountTarget, image docker.Named, ro bool) (err error) { var key SnapshotKey imageID := s.runtime.GetImageIDOrDie(ctx, image) if ro { diff --git a/pkg/mountexecutor/mountexecutor.go b/pkg/mountexecutor/mountexecutor.go new file mode 100644 index 0000000..5d2e42f --- /dev/null +++ b/pkg/mountexecutor/mountexecutor.go @@ -0,0 +1,129 @@ +package mountexecutor + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/containerd/containerd/reference/docker" + "github.com/warm-metal/csi-driver-image/pkg/backend" + "github.com/warm-metal/csi-driver-image/pkg/mountstatus" + "github.com/warm-metal/csi-driver-image/pkg/pullstatus" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" +) + +const ( + mountPollTimeInterval = 100 * time.Millisecond + mountPollTimeout = 2 * time.Minute + mountCtxTimeout = 10 * time.Minute +) + +// MountExecutorOptions are options passed to mount executor +type MountExecutorOptions struct { + AsyncMount bool + Mounter *backend.SnapshotMounter +} + +// MountOptions are options for a single mount request +type MountOptions struct { + // Context here is only valid for synchronous mounts + Context context.Context + NamedRef docker.Named + VolumeId string + TargetPath string + VolumeCapability *csi.VolumeCapability + ReadOnly bool +} + +// MountExecutor executes mount +type MountExecutor struct { + asyncMount bool + mutex *sync.Mutex + mounter *backend.SnapshotMounter + asyncErrs map[docker.Named]error +} + +// NewMountExecutor initializes a new mount executor +func NewMountExecutor(o *MountExecutorOptions) *MountExecutor { + return &MountExecutor{ + asyncMount: o.AsyncMount, + mutex: &sync.Mutex{}, + mounter: o.Mounter, + } +} + +// StartMounting starts the mounting +func (m *MountExecutor) StartMounting(o *MountOptions) error { + + if pullstatus.Get(o.NamedRef) != pullstatus.Pulled || mountstatus.Get(o.VolumeId) == mountstatus.StillMounting { + klog.Infof("image '%s' hasn't been pulled yet (status: %s) or volume is still mounting (status: %s)", + o.NamedRef.Name(), + pullstatus.Get(o.NamedRef), mountstatus.Get(o.VolumeId)) + return nil + } + + ro := o.ReadOnly || + o.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || + o.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY + + if !m.asyncMount { + mountstatus.Update(o.VolumeId, mountstatus.StillMounting) + if err := m.mounter.Mount(o.Context, o.VolumeId, backend.MountTarget(o.TargetPath), o.NamedRef, ro); err != nil { + mountstatus.Update(o.VolumeId, mountstatus.Errored) + return err + } + mountstatus.Update(o.VolumeId, mountstatus.Mounted) + return nil + } + + go func() { + m.mutex.Lock() + defer m.mutex.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), mountCtxTimeout) + defer cancel() + + mountstatus.Update(o.VolumeId, mountstatus.StillMounting) + if err := m.mounter.Mount(ctx, o.VolumeId, backend.MountTarget(o.TargetPath), o.NamedRef, ro); err != nil { + klog.Errorf("mount err: %v", err.Error()) + mountstatus.Update(o.VolumeId, mountstatus.Errored) + m.asyncErrs[o.NamedRef] = fmt.Errorf("err: %v: %v", err, m.asyncErrs[o.NamedRef]) + return + } + mountstatus.Update(o.VolumeId, mountstatus.Mounted) + }() + + return nil +} + +// WaitForMount waits for the volume to get mounted +func (m *MountExecutor) WaitForMount(o *MountOptions) error { + if pullstatus.Get(o.NamedRef) != pullstatus.Pulled { + return nil + } + + if !m.asyncMount { + return nil + } + + mountCondFn := func() (done bool, err error) { + if mountstatus.Get(o.VolumeId) == mountstatus.Mounted { + return true, nil + } + if m.asyncErrs[o.NamedRef] != nil { + return false, m.asyncErrs[o.NamedRef] + } + return false, nil + } + + if err := wait.PollImmediate( + mountPollTimeInterval, + mountPollTimeout, + mountCondFn); err != nil { + return fmt.Errorf("waited too long to mount the image: %v", err) + } + + return nil +} diff --git a/pkg/mountstatus/mountstatus.go b/pkg/mountstatus/mountstatus.go new file mode 100644 index 0000000..0b9095f --- /dev/null +++ b/pkg/mountstatus/mountstatus.go @@ -0,0 +1,63 @@ +package mountstatus + +import ( + "sync" +) + +// ImagePullStatus represents mount status of an image +type ImageMountStatus int + +// https://stackoverflow.com/questions/14426366/what-is-an-idiomatic-way-of-representing-enums-in-go +const ( + // StatusNotFound means there has been no attempt to mount the image + StatusNotFound ImageMountStatus = -1 + // StillMounting means the image is still being mounted as a volume + StillMounting ImageMountStatus = iota + // Mounted means the image has been mounted as a volume + Mounted + // Errored means there was an error during image mount + Errored +) + +// ImageMountStatusRecorder records the status of image mounts +type ImageMountStatusRecorder struct { + status map[string]ImageMountStatus + mutex sync.Mutex +} + +var i ImageMountStatusRecorder + +func init() { + i = ImageMountStatusRecorder{ + status: make(map[string]ImageMountStatus), + mutex: sync.Mutex{}, + } +} + +// Update updates the mount status of an image +func Update(volumeId string, status ImageMountStatus) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.status[volumeId] = status +} + +// Delete deletes the mount status of an image +func Delete(volumeId string) { + i.mutex.Lock() + defer i.mutex.Unlock() + + delete(i.status, volumeId) +} + +// Get gets the mount status of an image +func Get(volumeId string) ImageMountStatus { + i.mutex.Lock() + defer i.mutex.Unlock() + + if _, ok := i.status[volumeId]; !ok { + return StatusNotFound + } + + return i.status[volumeId] +} diff --git a/pkg/pullexecutor/pullexecutor.go b/pkg/pullexecutor/pullexecutor.go new file mode 100644 index 0000000..1fe9e58 --- /dev/null +++ b/pkg/pullexecutor/pullexecutor.go @@ -0,0 +1,148 @@ +package pullexecutor + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/containerd/containerd/reference/docker" + "github.com/pkg/errors" + "github.com/warm-metal/csi-driver-image/pkg/backend" + "github.com/warm-metal/csi-driver-image/pkg/pullstatus" + "github.com/warm-metal/csi-driver-image/pkg/remoteimage" + "github.com/warm-metal/csi-driver-image/pkg/secret" + "k8s.io/apimachinery/pkg/util/wait" + cri "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" +) + +const ( + pullPollTimeInterval = 100 * time.Millisecond + pullPollTimeout = 2 * time.Minute + pullCtxTimeout = 10 * time.Minute +) + +// PullExecutorOptions are the options passed to the pull executor +type PullExecutorOptions struct { + AsyncPull bool + ImageServiceClient cri.ImageServiceClient + SecretStore secret.Store + Mounter *backend.SnapshotMounter +} + +// PullOptions are the options for a single pull request +type PullOptions struct { + // Context here is only valid for synchronous mounts + Context context.Context + NamedRef docker.Named + PullAlways bool + PullSecrets map[string]string + Image string +} + +// PullExecutor executes the pulls +type PullExecutor struct { + asyncPull bool + imageSvcClient cri.ImageServiceClient + mutex *sync.Mutex + asyncErrs map[docker.Named]error + secretStore secret.Store + mounter *backend.SnapshotMounter +} + +// NewPullExecutor initializes a new pull executor object +func NewPullExecutor(o *PullExecutorOptions) *PullExecutor { + return &PullExecutor{ + asyncPull: o.AsyncPull, + mutex: &sync.Mutex{}, + imageSvcClient: o.ImageServiceClient, + secretStore: o.SecretStore, + mounter: o.Mounter, + } +} + +// StartPulling starts pulling the image +func (m *PullExecutor) StartPulling(o *PullOptions) error { + + keyring, err := m.secretStore.GetDockerKeyring(o.Context, o.PullSecrets) + if err != nil { + return errors.Errorf("unable to fetch keyring: %s", err) + } + + if !m.asyncPull { + puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring) + shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef) + if shouldPull { + klog.Infof("pull image %q ", o.Image) + pullstatus.Update(o.NamedRef, pullstatus.StillPulling) + if err = puller.Pull(o.Context); err != nil { + pullstatus.Update(o.NamedRef, pullstatus.Errored) + return errors.Errorf("unable to pull image %q: %s", o.NamedRef, err) + } + } + pullstatus.Update(o.NamedRef, pullstatus.Pulled) + return nil + } + + if pullstatus.Get(o.NamedRef) == pullstatus.Pulled || + pullstatus.Get(o.NamedRef) == pullstatus.StillPulling { + return nil + } + + go func() { + if pullstatus.Get(o.NamedRef) == pullstatus.StatusNotFound { + m.mutex.Lock() + defer m.mutex.Unlock() + c, cancel := context.WithTimeout(context.Background(), pullCtxTimeout) + defer cancel() + + if pullstatus.Get(o.NamedRef) == pullstatus.StillPulling { + return + } + + puller := remoteimage.NewPuller(m.imageSvcClient, o.NamedRef, keyring) + shouldPull := o.PullAlways || !m.mounter.ImageExists(o.Context, o.NamedRef) + if shouldPull { + klog.Infof("pull image %q ", o.Image) + pullstatus.Update(o.NamedRef, pullstatus.StillPulling) + if err = puller.Pull(c); err != nil { + pullstatus.Update(o.NamedRef, pullstatus.Errored) + m.asyncErrs[o.NamedRef] = fmt.Errorf("unable to pull image %q: %s", o.Image, err) + return + } + pullstatus.Update(o.NamedRef, pullstatus.Pulled) + + } + } + }() + + return nil +} + +// WaitForPull waits until the image pull succeeds or errors or timeout is exceeded +func (m *PullExecutor) WaitForPull(o *PullOptions) error { + if !m.asyncPull { + return nil + } + + condFn := func() (done bool, err error) { + if pullstatus.Get(o.NamedRef) == pullstatus.Pulled { + return true, nil + } + + if m.asyncErrs[o.NamedRef] != nil { + return false, m.asyncErrs[o.NamedRef] + } + return false, nil + } + + if err := wait.PollImmediate( + pullPollTimeInterval, + pullPollTimeout, + condFn); err != nil { + return errors.Errorf("waited too long to download the image: %v", err) + } + + return nil +} diff --git a/pkg/pullstatus/pullstatus.go b/pkg/pullstatus/pullstatus.go new file mode 100644 index 0000000..1a9a672 --- /dev/null +++ b/pkg/pullstatus/pullstatus.go @@ -0,0 +1,65 @@ +package pullstatus + +import ( + "sync" + + "github.com/containerd/containerd/reference/docker" +) + +// ImagePullStatus represents pull status of an image +type ImagePullStatus int + +// https://stackoverflow.com/questions/14426366/what-is-an-idiomatic-way-of-representing-enums-in-go +const ( + // StatusNotFound means there has been no attempt to pull the image + StatusNotFound ImagePullStatus = -1 + // StillPulling means the image is still being pulled + StillPulling ImagePullStatus = iota + // Pulled means the image has been pulled + Pulled + // Errored means there was an error during image pull + Errored +) + +// ImagePullStatusRecorder records the status of image pulls +type ImagePullStatusRecorder struct { + status map[docker.Named]ImagePullStatus + mutex sync.Mutex +} + +var i ImagePullStatusRecorder + +func init() { + i = ImagePullStatusRecorder{ + status: make(map[docker.Named]ImagePullStatus), + mutex: sync.Mutex{}, + } +} + +// Update updates the pull status of an image +func Update(imageRef docker.Named, status ImagePullStatus) { + i.mutex.Lock() + defer i.mutex.Unlock() + + i.status[imageRef] = status +} + +// Delete deletes the pull status of an image +func Delete(imageRef docker.Named) { + i.mutex.Lock() + defer i.mutex.Unlock() + + delete(i.status, imageRef) +} + +// Get gets the pull status of an image +func Get(imageRef docker.Named) ImagePullStatus { + i.mutex.Lock() + defer i.mutex.Unlock() + + if _, ok := i.status[imageRef]; !ok { + return StatusNotFound + } + + return i.status[imageRef] +} diff --git a/pkg/remoteimage/pull.go b/pkg/remoteimage/pull.go index 6cec330..9df21da 100644 --- a/pkg/remoteimage/pull.go +++ b/pkg/remoteimage/pull.go @@ -13,7 +13,8 @@ type Puller interface { Pull(context.Context) error } -func NewPuller(imageSvc cri.ImageServiceClient, image docker.Named, keyring credentialprovider.DockerKeyring) Puller { +func NewPuller(imageSvc cri.ImageServiceClient, image docker.Named, + keyring credentialprovider.DockerKeyring) Puller { return &puller{ imageSvc: imageSvc, image: image, diff --git a/pkg/remoteimage/pull_test.go b/pkg/remoteimage/pull_test.go new file mode 100644 index 0000000..52a1d18 --- /dev/null +++ b/pkg/remoteimage/pull_test.go @@ -0,0 +1,30 @@ +package remoteimage + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/warm-metal/csi-driver-image/pkg/cri" + "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +// Check test/integration/node-server/README.md for how to run this test correctly +func TestPull(t *testing.T) { + testImage := "docker.io/library/redis:latest" + socketAddr := "unix:///run/containerd/containerd.sock" + // addr, err := url.Parse(socketAddr) + // assert.NoError(t, err) + criClient, err := cri.NewRemoteImageService(socketAddr, time.Minute) + assert.NoError(t, err) + assert.NotNil(t, criClient) + + r, err := criClient.PullImage(context.Background(), &v1alpha2.PullImageRequest{ + Image: &v1alpha2.ImageSpec{ + Image: testImage, + }, + }) + assert.NoError(t, err) + assert.NotNil(t, r) +} diff --git a/test/integration/node-server/Dockerfile.containerd b/test/integration/node-server/Dockerfile.containerd new file mode 100644 index 0000000..ac67eeb --- /dev/null +++ b/test/integration/node-server/Dockerfile.containerd @@ -0,0 +1,28 @@ +# bookworm is name of the latest debian release at the time of writing this +FROM golang:1.19-bookworm + +# Based on https://docs.docker.com/engine/install/debian/ + +# Add Docker's official GPG key: +RUN apt-get update +RUN apt-get install ca-certificates curl gnupg +RUN install -m 0755 -d /etc/apt/keyrings +RUN curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg +RUN chmod a+r /etc/apt/keyrings/docker.gpg + +# Add the repository to Apt sources: +RUN echo "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/debian bookworm stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null +RUN apt-get update + +RUN apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin -y +RUN apt-get install libbtrfs-dev libgpgme-dev libdevmapper-dev -y + +# Install CNI plugins +RUN curl -LO https://github.com/containernetworking/plugins/releases/download/v1.3.0/cni-plugins-linux-amd64-v1.3.0.tgz +RUN mkdir -p /opt/cni/bin +RUN tar Cxzvf /opt/cni/bin cni-plugins-linux-amd64-v1.3.0.tgz +# To enable CRI plugin in containerd +RUN sed -i 's/disabled_plugins/#disabled_plugins/g' /etc/containerd/config.toml +# Directory for node server's csi socket +RUN mkdir /csi +# Fixes https://github.com/containers/buildah/issues/2922 diff --git a/test/integration/node-server/README.md b/test/integration/node-server/README.md new file mode 100644 index 0000000..4e42121 --- /dev/null +++ b/test/integration/node-server/README.md @@ -0,0 +1,83 @@ +## What is this? +This directory contains two files: +1. `docker-compose.yaml`: used to mount workspace on image built using `Dockerfile.containerd` +2. `Dockerfile.containerd`: used to run containerd + +## Why? +This is to test `cmd/plugin/node_server_test.go` and `pkg/remoteimage/pull_test.go` + +## How to run the tests? +Build the image +```shell +$ docker build . -f Dockerfile.containerd -t : +``` +For example +```shell +$ docker build . -f Dockerfile.containerd -t containerd-on-mac:0.6 +``` + +Start using the image: +``` +$ docker-compose up +``` + +For example: +```shell +$ docker-compose up +[+] Building 0.0s (0/0) docker:desktop-linux +[+] Running 1/0 + ✔ Container node-server-containerd-workspace-1 Recreated 0.0s +Attaching to node-server-containerd-workspace-1 +node-server-containerd-workspace-1 | time="2023-11-08T09:03:00Z" level=warning msg="containerd config version `1` has been deprecated and will be removed in containerd v2.0, please switch to version `2`, see https://github.com/containerd/containerd/blob/main/docs/PLUGINS.md#version-header" +node-server-containerd-workspace-1 | time="2023-11-08T09:03:00.484125750Z" level=info msg="starting containerd" revision=61f9fd88f79f081d64d6fa3bb1a0dc71ec870523 version=1.6.24 +node-server-containerd-workspace-1 | time="2023-11-08T09:03:00.491186708Z" level=info msg="loading plugin \"io.containerd.content.v1.content\"..." type=io.containerd.content.v1 +... +``` + +`exec` into the docker container: +``` +$ docker ps +``` +For example: +```shell +$ docker ps +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES +7769b9e621f1 containerd-on-mac:0.6 "containerd" 24 minutes ago Up 24 minutes node-server-containerd-workspace-1 +``` + +Get the `CONTAINER ID` and `exec` into it using: +```shell +$ docker exec -it bash +``` +For example: +```shell +$ docker exec -it 7769b9e621f1 bash ~ +root@7769b9e621f1:/go# +``` + +**To run `TestNodePublishVolume`**: + +`cd` into `/code/cmd/plugin` and run `go test -run 'TestNodePublishVolume'` (note that `TestNodePublishVolume` is a regex) +``` +root@7769b9e621f1:/go# cd /code/cmd/plugin +root@7769b9e621f1:/code/cmd/plugin# go test -run 'TestNodePublishVolume' +I1108 10:27:03.784182 19936 mounter.go:45] load 0 snapshots from runtime +I1108 10:27:03.787347 19936 server.go:108] Listening for connections on address: &net.UnixAddr{Name:"//csi/csi.sock", Net:"unix"} +... +I1108 10:27:13.222601 19936 node_server_test.go:94] server was stopped +I1108 10:27:13.225907 19936 mounter.go:45] load 0 snapshots from runtime +I1108 10:27:13.235697 19936 server.go:108] Listening for connections on address: &net.UnixAddr{Name:"//csi/csi.sock", Net:"unix"} +... +PASS +ok github.com/warm-metal/csi-driver-image/cmd/plugin 46.711s +``` + +**To test `TestPull`**: +`cd` into `/code/pkg/remoteimage` and run `go test -run 'TestPull'` (note that `TestPull` is a regex) +``` +root@cdf7ee254501:~# cd /code/pkg/remoteimage +root@cdf7ee254501:/code/pkg/remoteimage# go test -run 'TestPull' +PASS +ok github.com/warm-metal/csi-driver-image/pkg/remoteimage 2.247s +root@cdf7ee254501:/code/pkg/remoteimage# +``` \ No newline at end of file diff --git a/test/integration/node-server/docker-compose.yaml b/test/integration/node-server/docker-compose.yaml new file mode 100644 index 0000000..f462c7d --- /dev/null +++ b/test/integration/node-server/docker-compose.yaml @@ -0,0 +1,9 @@ +services: + containerd-workspace: + # replace the image here with the image you built using `Dockerfile.containerd` + image: containerd-on-mac:0.6 + privileged: true # to enable mount/unmount operations + command: containerd + # mount code in the current repo to `/code` path in the container + volumes: + - ../../../.:/code \ No newline at end of file