From 1e20f356bf9f5beb0f94a8078529c7d198ad24f3 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Fri, 17 Jan 2025 21:17:33 +0100 Subject: [PATCH 1/5] CI/CD improve docker build cache with github actions and Dockerfile --- .github/workflows/build_images.yaml | 12 ++++++++++++ docker/Dockerfile.vk | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_images.yaml b/.github/workflows/build_images.yaml index a02bdd9f..6f26348d 100644 --- a/.github/workflows/build_images.yaml +++ b/.github/workflows/build_images.yaml @@ -29,6 +29,18 @@ jobs: - name: Get Repo Owner id: get_repo_owner run: echo ::set-output name=repo_owner::$(echo ${{ github.repository_owner }} | tr '[:upper:]' '[:lower:]') + + # See https://docs.docker.com/build/ci/github-actions/cache/ for cache to speed go build + - name: Go Build Cache for Docker + uses: actions/cache@v4 + with: + path: go-build-cache + key: ${{ runner.os }}-go-build-cache-${{ hashFiles('**/go.sum') }} + - name: Inject go-build-cache + uses: reproducible-containers/buildkit-cache-dance@4b2444fec0c0fb9dbf175a96c094720a692ef810 # v2.1.4 + with: + cache-source: go-build-cache + - name: Build container base image vk uses: docker/build-push-action@v5 with: diff --git a/docker/Dockerfile.vk b/docker/Dockerfile.vk index f0249d25..73249171 100644 --- a/docker/Dockerfile.vk +++ b/docker/Dockerfile.vk @@ -15,8 +15,8 @@ RUN mkdir -p $GOMODCACHE && mkdir -p $GOCACHE RUN bash -c "KUBELET_VERSION=${VERSION} ./cmd/virtual-kubelet/set-version.sh" -RUN go mod tidy -RUN CGO_ENABLED=0 GOOS=linux go build -o bin/vk cmd/virtual-kubelet/main.go +RUN --mount=type=cache,target=/go/pkg/mod bash -c "time go mod tidy" +RUN --mount=type=cache,target=/go/build-cache bash -c "time CGO_ENABLED=0 GOOS=linux go build -o bin/vk cmd/virtual-kubelet/main.go" # Deploy the application binary into a lean image FROM ubuntu:22.04 AS build-release-stage From d3f52f9435d56c1e31f3df293e89685cf00d5ef5 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Thu, 19 Dec 2024 16:17:07 +0100 Subject: [PATCH 2/5] Fix https://github.com/interTwin-eu/interlink-slurm-plugin/issues/45 - partial support of ProjectedVolume, CreateToken API to get token, env vars to provide k8s API URL --- ci/manifests/service-account.yaml | 7 + .../interlink-docker/vk/service-account.yaml | 7 + pkg/interlink/api/create.go | 11 + pkg/interlink/api/func.go | 69 +++- pkg/interlink/types.go | 15 +- pkg/virtualkubelet/config.go | 33 +- pkg/virtualkubelet/execute.go | 361 +++++++++++++++--- 7 files changed, 409 insertions(+), 94 deletions(-) diff --git a/ci/manifests/service-account.yaml b/ci/manifests/service-account.yaml index 2169e592..3ff9ba59 100644 --- a/ci/manifests/service-account.yaml +++ b/ci/manifests/service-account.yaml @@ -33,6 +33,13 @@ rules: - get - list - watch +# For https://kubernetes.io/docs/reference/kubernetes-api/authentication-resources/token-request-v1/ +- apiGroups: [""] + resources: ["serviceaccounts/token"] + verbs: + - create + - get + - list - apiGroups: - "" resources: diff --git a/example/interlink-docker/vk/service-account.yaml b/example/interlink-docker/vk/service-account.yaml index 3dbaa7f2..e25d0914 100644 --- a/example/interlink-docker/vk/service-account.yaml +++ b/example/interlink-docker/vk/service-account.yaml @@ -33,6 +33,13 @@ rules: - get - list - watch +# For https://kubernetes.io/docs/reference/kubernetes-api/authentication-resources/token-request-v1/ +- apiGroups: [""] + resources: ["serviceaccounts/token"] + verbs: + - create + - get + - list - apiGroups: - "" resources: diff --git a/pkg/interlink/api/create.go b/pkg/interlink/api/create.go index 1c4bde3a..5f0390d9 100644 --- a/pkg/interlink/api/create.go +++ b/pkg/interlink/api/create.go @@ -64,6 +64,17 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) return } + if log.G(h.Ctx).Logger.IsLevelEnabled(log.DebugLevel) { + // For debugging purpose only. + allContainers := pod.Pod.Spec.InitContainers + allContainers = append(allContainers, pod.Pod.Spec.Containers...) + for _, container := range allContainers { + for _, envVar := range container.Env { + log.G(h.Ctx).Debug("InterLink VK environment variable to pod ", pod.Pod.Name, " container: ", container.Name, " env: ", envVar.Name, " value: ", envVar.Value) + } + } + } + retrievedData = append(retrievedData, data) if retrievedData != nil { diff --git a/pkg/interlink/api/func.go b/pkg/interlink/api/func.go index d6ee2e0c..3c2aa490 100644 --- a/pkg/interlink/api/func.go +++ b/pkg/interlink/api/func.go @@ -3,6 +3,7 @@ package api import ( "context" "path/filepath" + "strings" "sync" "time" @@ -35,10 +36,10 @@ func getData(ctx context.Context, config types.Config, pod types.PodCreateReques startContainer := time.Now().UnixMicro() log.G(ctx).Info("- Retrieving Secrets and ConfigMaps for the Docker Sidecar. InitContainer: " + container.Name) log.G(ctx).Debug(container.VolumeMounts) - data, InterlinkIP := retrieveData(ctx, config, pod, container) - if InterlinkIP != nil { - log.G(ctx).Error(InterlinkIP) - return types.RetrievedPodData{}, InterlinkIP + data, err := retrieveData(ctx, config, pod, container) + if err != nil { + log.G(ctx).Error(err) + return types.RetrievedPodData{}, err } retrievedData.Containers = append(retrievedData.Containers, data) @@ -75,40 +76,76 @@ func getData(ctx context.Context, config types.Config, pod types.PodCreateReques // It returns the retrieved data in a variable of type commonIL.RetrievedContainer and the first encountered error. func retrieveData(ctx context.Context, config types.Config, pod types.PodCreateRequests, container v1.Container) (types.RetrievedContainer, error) { retrievedData := types.RetrievedContainer{} + retrievedData.Name = container.Name for _, mountVar := range container.VolumeMounts { - log.G(ctx).Debug("-- Retrieving data for mountpoint " + mountVar.Name) + log.G(ctx).Debug("-- Retrieving data for mountpoint ", mountVar.Name) + loopVolumes: for _, vol := range pod.Pod.Spec.Volumes { if vol.Name == mountVar.Name { switch { case vol.ConfigMap != nil: - - log.G(ctx).Info("--- Retrieving ConfigMap " + vol.ConfigMap.Name) - retrievedData.Name = container.Name + log.G(ctx).Info("--- Retrieving ConfigMap ", vol.ConfigMap.Name) for _, cfgMap := range pod.ConfigMaps { if cfgMap.Name == vol.ConfigMap.Name { - retrievedData.Name = container.Name + log.G(ctx).Debug("configMap found! Name: ", cfgMap.Name) retrievedData.ConfigMaps = append(retrievedData.ConfigMaps, cfgMap) + break loopVolumes + } + } + // This should not happen, error. Building error context. + var configMapsKeys []string + for _, cfgMap := range pod.ConfigMaps { + configMapsKeys = append(configMapsKeys, cfgMap.Name) + } + log.G(ctx).Errorf("could not find in retrievedData the matching object for volume: %s (pod: %s container: %s configMap: %s) retrievedData keys: %s", vol.Name, + pod.Pod.Name, container.Name, vol.ConfigMap.Name, strings.Join(configMapsKeys, ",")) + + case vol.Projected != nil: + log.G(ctx).Info("--- Retrieving ProjectedVolume ", vol.Name) + for _, projectedVolumeMap := range pod.ProjectedVolumeMaps { + log.G(ctx).Debug("Comparing projectedVolumeMap.Name: ", projectedVolumeMap.Name, " with vol.Name: ", vol.Name) + if projectedVolumeMap.Name == vol.Name { + log.G(ctx).Debug("projectedVolumeMap found! Name: ", projectedVolumeMap.Name) + + retrievedData.ProjectedVolumeMaps = append(retrievedData.ProjectedVolumeMaps, projectedVolumeMap) + break loopVolumes } } + // This should not happen, error. Building error context. + var projectedVolumeMapsKeys []string + for _, projectedVolumeMap := range pod.ProjectedVolumeMaps { + projectedVolumeMapsKeys = append(projectedVolumeMapsKeys, projectedVolumeMap.Name) + } + log.G(ctx).Errorf("could not find in retrievedData the matching object for volume: %s (pod: %s container: %s projectedVolumeMap) retrievedData keys: %s", + vol.Name, pod.Pod.Name, container.Name, strings.Join(projectedVolumeMapsKeys, ",")) case vol.Secret != nil: - - log.G(ctx).Info("--- Retrieving Secret " + vol.Secret.SecretName) - retrievedData.Name = container.Name + log.G(ctx).Info("--- Retrieving Secret ", vol.Secret.SecretName) for _, secret := range pod.Secrets { if secret.Name == vol.Secret.SecretName { - retrievedData.Name = container.Name + log.G(ctx).Debug("secret found! Name: ", secret.Name) retrievedData.Secrets = append(retrievedData.Secrets, secret) + break loopVolumes } } + // This should not happen, error. Building error context. + var secretKeys []string + for _, secret := range pod.Secrets { + secretKeys = append(secretKeys, secret.Name) + } + log.G(ctx).Errorf("could not find in retrievedData the matching object for volume: %s (pod: %s container: %s secret: %s) retrievedData keys: %s", + pod.Pod.Name, container.Name, vol.Name, vol.Secret.SecretName, strings.Join(secretKeys, ",")) case vol.EmptyDir != nil: - edPath := filepath.Join(config.DataRootFolder, pod.Pod.Namespace+"-"+string(pod.Pod.UID)+"/"+"emptyDirs/"+vol.Name) - - retrievedData.Name = container.Name + // Deprecated: EmptyDirs is useless at VK level. It should be moved to plugin level. + edPath := filepath.Join(config.DataRootFolder, pod.Pod.Namespace+"-"+string(pod.Pod.UID), "emptyDirs", vol.Name) retrievedData.EmptyDirs = append(retrievedData.EmptyDirs, edPath) + + default: + log.G(ctx).Warning("ignoring unsupported volume type for ", mountVar.Name) } + } } } diff --git a/pkg/interlink/types.go b/pkg/interlink/types.go index 70981687..0d09de8d 100644 --- a/pkg/interlink/types.go +++ b/pkg/interlink/types.go @@ -11,6 +11,9 @@ type PodCreateRequests struct { Pod v1.Pod `json:"pod"` ConfigMaps []v1.ConfigMap `json:"configmaps"` Secrets []v1.Secret `json:"secrets"` + // The projected volumes are those created by ServiceAccounts (in K8S >= 1.24). They are automatically added in the pod from kubelet code. + // Here the configmap will hold the files name (as key) and content (as value). + ProjectedVolumeMaps []v1.ConfigMap `json:"projectedvolumesmaps"` } // PodStatus is a simplified v1.Pod struct, holding only necessary variables to uniquely identify a job/service in the sidecar. It is used to request @@ -31,10 +34,14 @@ type CreateStruct struct { // RetrievedContainer is used in InterLink to rearrange data structure in a suitable way for the sidecar type RetrievedContainer struct { - Name string `json:"name"` - ConfigMaps []v1.ConfigMap `json:"configMaps"` - Secrets []v1.Secret `json:"secrets"` - EmptyDirs []string `json:"emptyDirs"` + Name string `json:"name"` + ConfigMaps []v1.ConfigMap `json:"configMaps"` + ProjectedVolumeMaps []v1.ConfigMap `json:"projectedvolumemaps"` + Secrets []v1.Secret `json:"secrets"` + // Deprecated: EmptyDirs should be built on plugin side. + // Currently, it holds the DATA_ROOT_DIR/emptydirs/volumeName, but this should be a plugin choice instead, + // like it currently is for ConfigMaps, ProjectedVolumeMaps, Secrets. + EmptyDirs []string `json:"emptyDirs"` } // RetrievedPoData is used in InterLink to rearrange data structure in a suitable way for the sidecar diff --git a/pkg/virtualkubelet/config.go b/pkg/virtualkubelet/config.go index 26215370..c9bb4d27 100644 --- a/pkg/virtualkubelet/config.go +++ b/pkg/virtualkubelet/config.go @@ -2,21 +2,24 @@ package virtualkubelet // Config holds the whole configuration type Config struct { - InterlinkURL string `yaml:"InterlinkURL"` - Interlinkport string `yaml:"InterlinkPort"` - VKConfigPath string `yaml:"VKConfigPath"` - VKTokenFile string `yaml:"VKTokenFile"` - ServiceAccount string `yaml:"ServiceAccount"` - Namespace string `yaml:"Namespace"` - PodIP string `yaml:"PodIP"` - VerboseLogging bool `yaml:"VerboseLogging"` - ErrorsOnlyLogging bool `yaml:"ErrorsOnlyLogging"` - HTTP HTTP `yaml:"HTTP"` - KubeletHTTP HTTP `yaml:"KubeletHTTP"` - CPU string `yaml:"CPU,omitempty"` - Memory string `yaml:"Memory,omitempty"` - Pods string `yaml:"Pods,omitempty"` - GPU string `yaml:"nvidia.com/gpu,omitempty"` + InterlinkURL string `yaml:"InterlinkURL"` + Interlinkport string `yaml:"InterlinkPort"` + KubernetesApiAddr string `yaml:"KubernetesApiAddr"` + KubernetesApiPort string `yaml:"KubernetesApiPort"` + KubernetesApiCaCrt string `yaml:"KubernetesApiCaCrt"` + VKConfigPath string `yaml:"VKConfigPath"` + VKTokenFile string `yaml:"VKTokenFile"` + ServiceAccount string `yaml:"ServiceAccount"` + Namespace string `yaml:"Namespace"` + PodIP string `yaml:"PodIP"` + VerboseLogging bool `yaml:"VerboseLogging"` + ErrorsOnlyLogging bool `yaml:"ErrorsOnlyLogging"` + HTTP HTTP `yaml:"HTTP"` + KubeletHTTP HTTP `yaml:"KubeletHTTP"` + CPU string `yaml:"CPU,omitempty"` + Memory string `yaml:"Memory,omitempty"` + Pods string `yaml:"Pods,omitempty"` + GPU string `yaml:"nvidia.com/gpu,omitempty"` } type HTTP struct { diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 98b5153f..152ceece 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" trace "go.opentelemetry.io/otel/trace" + authenticationv1 "k8s.io/api/authentication/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,8 +28,8 @@ import ( const PodPhaseInitialize = "Initializing" const PodPhaseCompleted = "Completed" -func failedMount(ctx context.Context, failed *bool, name string, pod *v1.Pod, p *Provider) error { - *failed = true +func failedMount(ctx context.Context, failedAndWait *bool, name string, pod *v1.Pod, p *Provider) error { + *failedAndWait = true log.G(ctx).Warning("Unable to find ConfigMap " + name + " for pod " + pod.Name + ". Waiting for it to be initialized") if pod.Status.Phase != PodPhaseInitialize { pod.Status.Phase = PodPhaseInitialize @@ -416,96 +417,338 @@ func LogRetrieval( return resp.Body, err } -// RemoteExecution is called by the VK everytime a Pod is being registered or deleted to/from the VK. -// Depending on the mode (CREATE/DELETE), it performs different actions, making different REST calls. -// Note: for the CREATE mode, the function gets stuck up to 5 minutes waiting for every missing ConfigMap/Secret. -// If after 5m they are not still available, the function errors out -func RemoteExecution(ctx context.Context, config Config, p *Provider, pod *v1.Pod, mode int8) error { +// Adds to pod environment variables related to services. For now, it only concerns Kubernetes API variables, example below: +/* +KUBERNETES_PORT=tcp://10.96.0.1:443 +KUBERNETES_SERVICE_PORT=443 +KUBERNETES_PORT_443_TCP_ADDR=10.96.0.1 +KUBERNETES_PORT_443_TCP_PORT=443 +KUBERNETES_PORT_443_TCP_PROTO=tcp +KUBERNETES_PORT_443_TCP=tcp://10.96.0.1:443 +KUBERNETES_SERVICE_PORT_HTTPS=443 +KUBERNETES_SERVICE_HOST=10.96.0.1 +*/ +func addKubernetesServicesEnvVars(ctx context.Context, config Config, pod *v1.Pod) { + if config.KubernetesApiAddr == "" || config.KubernetesApiPort == "" { + log.G(ctx).Info("InterLink configuration does not contains both KubernetesApiAddr and KubernetesApiPort, so no env var like KUBERNETES_SERVICE_HOST is added.") + return + } - token := "" - if config.VKTokenFile != "" { - b, err := os.ReadFile(config.VKTokenFile) // just pass the file name - if err != nil { - log.G(ctx).Fatal(err) - return err + appendEnvVar := func(envs *[]v1.EnvVar, name string, value string) { + envVar := v1.EnvVar{ + Name: name, + Value: value, } - token = string(b) + *envs = append(*envs, envVar) + } + appendEnvVars := func(containersPtr *[]v1.Container, index int) { + containers := *containersPtr + //container := containers[index] + envsPtr := &containers[index].Env + + appendEnvVar(envsPtr, "KUBERNETES_PORT", "tcp://"+config.KubernetesApiAddr+":"+config.KubernetesApiPort) + appendEnvVar(envsPtr, "KUBERNETES_SERVICE_PORT", config.KubernetesApiPort) + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_ADDR", config.KubernetesApiAddr) + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_PORT", config.KubernetesApiPort) + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_PROTO", "tcp") + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP", "tcp://"+config.KubernetesApiAddr+":"+config.KubernetesApiPort) + appendEnvVar(envsPtr, "KUBERNETES_SERVICE_PORT_HTTPS", config.KubernetesApiPort) + appendEnvVar(envsPtr, "KUBERNETES_SERVICE_HOST", config.KubernetesApiAddr) + } + // Warning: loop range copy value, so to modify original containers, we must use index instead. + for i, _ := range pod.Spec.InitContainers { + appendEnvVars(&pod.Spec.InitContainers, i) + } + for i, _ := range pod.Spec.Containers { + appendEnvVars(&pod.Spec.Containers, i) } - switch mode { - case CREATE: - var req types.PodCreateRequests - var resp types.CreateStruct - req.Pod = *pod - startTime := time.Now() + if log.G(ctx).Logger.IsLevelEnabled(log.DebugLevel) { + // For debugging purpose only. + for _, container := range pod.Spec.InitContainers { + for _, envVar := range container.Env { + log.G(ctx).Debug("in addKubernetesServicesEnvVars InterLink VK environment variable to pod ", pod.Name, " container: ", container.Name, " env: ", envVar.Name, " value: ", envVar.Value) + } + } + for _, container := range pod.Spec.Containers { + for _, envVar := range container.Env { + log.G(ctx).Debug("in addKubernetesServicesEnvVars InterLink VK environment variable to pod ", pod.Name, " container: ", container.Name, " env: ", envVar.Name, " value: ", envVar.Value) + } + } + } + log.G(ctx).Info("InterLink VK added a set of environment variables (e.g.: KUBERNETES_SERVICE_HOST) to all containers of pod ", + pod.Name, " k8s addr ", config.KubernetesApiAddr, " k8s port ", config.KubernetesApiPort) +} + +// Handle projected sources and fills the projectedVolume object. +func remoteExecutionHandleProjectedSource( + ctx context.Context, p *Provider, pod *v1.Pod, source v1.VolumeProjection, projectedVolume *v1.ConfigMap, +) error { + switch { + case source.ServiceAccountToken != nil: + /* Case + - serviceAccountToken: + expirationSeconds: 3600 + path: token + */ + log.G(ctx).Debug("Volume is a projected volume typed serviceAccountToken") + + // Now using TokenRequest API (https://kubernetes.io/docs/reference/kubernetes-api/authentication-resources/token-request-v1/) + var expirationSeconds int64 + /* + TODO: honor the expirationSeconds field and implement a rotation. + if source.ServiceAccountToken.ExpirationSeconds != nil { + expirationSeconds = *source.ServiceAccountToken.ExpirationSeconds + } else { + // If not expiration is set, set to 1h. + expirationSeconds = 3600 + } + */ + // Infinite = 100 years + expirationSeconds = 100 * 365 * 24 * 3600 + + // Bount it to POD, so that token is deleted if pod is deleted. This is important given the illimited expiration. + bountObjectRef := &authenticationv1.BoundObjectReference{ + Kind: "Pod", + // Only one of UID or Name is sufficient, k8s will retrieve the other value. + UID: pod.UID, + Name: pod.Name, + } + tokenRequest := &authenticationv1.TokenRequest{ + Spec: authenticationv1.TokenRequestSpec{ + // No need to set audience field. If set with wrong value, it might break token validity! + ExpirationSeconds: &expirationSeconds, + BoundObjectRef: bountObjectRef, + }, + } - timeNow := time.Now() - _, err := p.clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + tokenRequestResult, err := p.clientSet.CoreV1().ServiceAccounts(pod.Namespace).CreateToken( + ctx, pod.Spec.ServiceAccountName, tokenRequest, metav1.CreateOptions{}) if err != nil { - log.G(ctx).Warning("Deleted Pod before actual creation") - return nil + log.G(ctx).Error("error during token request in RemoteExecution() ", err) } + log.G(ctx).Debug("could get token ", tokenRequestResult.Status.Token) + + // Add found token to result. + projectedVolume.Data[source.ServiceAccountToken.Path] = tokenRequestResult.Status.Token + + case source.ConfigMap != nil: + /* Case + - configMap: + items: + - key: ca.crt + path: ca.crt + name: kube-root-ca.crt + */ + for _, item := range source.ConfigMap.Items { + KUBE_CA_CRT := "kube-root-ca.crt" + overrideCaCrt := p.config.KubernetesApiCaCrt + if source.ConfigMap.Name == KUBE_CA_CRT && overrideCaCrt != "" { + log.G(ctx).Debug("handling special case of Kubernetes API kube-root-ca.crt, override found, using provided ca.crt:, ", overrideCaCrt) + projectedVolume.Data[item.Path] = overrideCaCrt + } else { + // This gets the usual certificate for K8s API, but it is restricted to whatever usual IP/FQDN of K8S API URL. + // With InterLink, the Kubernetes internal network is not accessible so this default ca.crt is probably useless. + log.G(ctx).Warning("using default Kubernetes API kube-root-ca.crt (no override found), but the default one might not be compatible with the subject: ", p.config.KubernetesApiAddr) + cfgmap, err := p.clientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, source.ConfigMap.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error during retrieval of ConfigMap %s error: %w", source.ConfigMap.Name, err) + } + if value, ok := cfgmap.Data[item.Key]; ok { + projectedVolume.Data[item.Path] = value + } else { + return fmt.Errorf("error during retrieval of key %s of (existing) ConfigMap %s error: %w", item.Key, source.ConfigMap.Name, err) + } + } + } + + case source.DownwardAPI != nil: + /* Case + - downwardAPI: + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + path: namespace + */ + // https://kubernetes.io/docs/concepts/workloads/pods/downward-api/ + // See URL doc above, that describe what type of DownwardAPI to expect from volume. For now, only FieldRef is supported. + // The rest are ignored. + for _, item := range source.DownwardAPI.Items { + switch { + + case item.FieldRef != nil: + switch item.FieldRef.FieldPath { + case "metadata.name": + projectedVolume.Data[item.Path] = pod.Name + + case "metadata.namespace": + projectedVolume.Data[item.Path] = pod.Namespace - var failed bool + case "metadata.uid": + projectedVolume.Data[item.Path] = string(pod.UID) - for _, volume := range pod.Spec.Volumes { - for { - if timeNow.Sub(startTime).Seconds() < time.Hour.Minutes()*5 { - if volume.ConfigMap != nil { - cfgmap, err := p.clientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{}) + // TODO implement DownwardAPI annotation and label if needed. + + default: + log.G(ctx).Warningf("in pod %s unsupported DownwardAPI FieldPath %s in InterLink, ignoring this source...", pod.Name, item.FieldRef.FieldPath) + } + + case item.ResourceFieldRef != nil: + // TODO implement DownwardAPI resourceFieldRef if needed. + log.G(ctx).Warningf("in pod %s unsupported DownwardAPI resourceFieldRef in InterLink, ignoring this source...", pod.Name) + + default: + log.G(ctx).Warningf("in pod %s unsupported unknown DownwardAPI in InterLink, ignoring this source...", pod.Name) + } + + } + } + return nil +} + +func remoteExecutionHandleVolumes(ctx context.Context, p *Provider, pod *v1.Pod, req *types.PodCreateRequests) error { + startTime := time.Now() + + timeNow := time.Now() + _, err := p.clientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + log.G(ctx).Warning("Deleted Pod before actual creation") + return nil + } + // Sometime the get secret or configmap can fail because it didn't have time to initialize, thus this + // is not a true failure. We use this flag to wait. + var failedAndWait bool + + log.G(ctx).Debug("Looking at volumes") + for _, volume := range pod.Spec.Volumes { + log.G(ctx).Debug("Looking at volume ", volume) + for { + failedAndWait = false + if timeNow.Sub(startTime).Seconds() < time.Hour.Minutes()*5 { + switch { + case volume.ConfigMap != nil: + cfgmap, err := p.clientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, volume.ConfigMap.Name, metav1.GetOptions{}) + if err != nil { + err = failedMount(ctx, &failedAndWait, volume.ConfigMap.Name, pod, p) if err != nil { - err = failedMount(ctx, &failed, volume.ConfigMap.Name, pod, p) - if err != nil { - return err - } - } else { - failed = false - req.ConfigMaps = append(req.ConfigMaps, *cfgmap) + return err } - } else if volume.Secret != nil { - scrt, err := p.clientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{}) + } else { + req.ConfigMaps = append(req.ConfigMaps, *cfgmap) + } + + case volume.Projected != nil: + // The service account token uses the projected volume in K8S >= 1.24. + + var projectedVolume v1.ConfigMap + projectedVolume.Name = volume.Name + projectedVolume.Data = make(map[string]string) + log.G(ctx).Debug("Adding to PodCreateRequests the projected volume ", volume.Name) + req.ProjectedVolumeMaps = append(req.ProjectedVolumeMaps, projectedVolume) + + for _, source := range volume.Projected.Sources { + err := remoteExecutionHandleProjectedSource(ctx, p, pod, source, &projectedVolume) if err != nil { - err = failedMount(ctx, &failed, volume.Secret.SecretName, pod, p) - if err != nil { - return err - } + return err } else { - failed = false - req.Secrets = append(req.Secrets, *scrt) + failedAndWait = false } + log.G(ctx).Debug("ProjectedVolumeMaps len: ", len(req.ProjectedVolumeMaps)) } - if failed { - time.Sleep(time.Second) - continue - } - pod.Status.Phase = v1.PodPending - err = p.UpdatePod(ctx, pod) + case volume.Secret != nil: + scrt, err := p.clientSet.CoreV1().Secrets(pod.Namespace).Get(ctx, volume.Secret.SecretName, metav1.GetOptions{}) if err != nil { - return err + err = failedMount(ctx, &failedAndWait, volume.Secret.SecretName, pod, p) + if err != nil { + return err + } + } else { + req.Secrets = append(req.Secrets, *scrt) } - break + + case volume.EmptyDir != nil: + log.G(ctx).Debugf("empty dir found, nothing to do for volume %s for Pod %s", volume.Name, pod.Name) + + default: + log.G(ctx).Warningf("ignoring unsupported volume %s for Pod %s", volume.Name, pod.Name) } - pod.Status.Phase = v1.PodFailed - pod.Status.Reason = "CFGMaps/Secrets not found" - for i := range pod.Status.ContainerStatuses { - pod.Status.ContainerStatuses[i].Ready = false + if failedAndWait { + time.Sleep(time.Second) + continue } + pod.Status.Phase = v1.PodPending err = p.UpdatePod(ctx, pod) if err != nil { return err } - return errors.New("unable to retrieve ConfigMaps or Secrets. Check logs") + break + } + + pod.Status.Phase = v1.PodFailed + pod.Status.Reason = "CFGMaps/Secrets not found" + for i := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[i].Ready = false + } + err = p.UpdatePod(ctx, pod) + if err != nil { + return err } + return errors.New("unable to retrieve ConfigMaps or Secrets. Check logs") + } + } + return nil +} + +// RemoteExecution is called by the VK everytime a Pod is being registered or deleted to/from the VK. +// Depending on the mode (CREATE/DELETE), it performs different actions, making different REST calls. +// Note: for the CREATE mode, the function gets stuck up to 5 minutes waiting for every missing ConfigMap/Secret. +// If after 5m they are not still available, the function errors out +func RemoteExecution(ctx context.Context, config Config, p *Provider, pod *v1.Pod, mode int8) error { + + token := "" + if config.VKTokenFile != "" { + b, err := os.ReadFile(config.VKTokenFile) // just pass the file name + if err != nil { + log.G(ctx).Fatal(err) + return err + } + token = string(b) + } + switch mode { + case CREATE: + var req types.PodCreateRequests + var resp types.CreateStruct + + req.Pod = *pod + + err := remoteExecutionHandleVolumes(ctx, p, pod, &req) + if err != nil { + return err } + // Adds special Kubernetes env var. Note: the pod provided by VK is "immutable", well it is a copy. In InterLink, we can modify it. + addKubernetesServicesEnvVars(ctx, config, pod) + + // For debugging purpose only. + for _, container := range pod.Spec.InitContainers { + for _, envVar := range container.Env { + log.G(ctx).Debug("InterLink VK environment variable to pod ", pod.Name, " container: ", container.Name, " env: ", envVar.Name, " value: ", envVar.Value) + } + } + for _, container := range pod.Spec.Containers { + for _, envVar := range container.Env { + log.G(ctx).Debug("InterLink VK environment variable to pod ", pod.Name, " container: ", container.Name, " env: ", envVar.Name, " value: ", envVar.Value) + } + } returnVal, err := createRequest(ctx, config, req, token) if err != nil { return fmt.Errorf("error doing createRequest() in RemoteExecution() return value %s error detail %s error: %w", returnVal, fmt.Sprintf("%#v", err), err) } - log.G(ctx).Debug("Pod " + pod.Name + " with Job ID " + resp.PodJID + " before json.Unmarshal()") + log.G(ctx).Debug("Pod ", pod.Name, " with Job ID ", resp.PodJID, " before json.Unmarshal()") // get remote job ID and annotate it into the pod err = json.Unmarshal(returnVal, &resp) if err != nil { From 4e18fbd71dc1b63ddd3060b893b7f3e11bdbc84c Mon Sep 17 00:00:00 2001 From: antoinetran Date: Fri, 31 Jan 2025 14:42:33 +0100 Subject: [PATCH 3/5] fix lint --- pkg/interlink/api/func.go | 5 ++--- pkg/virtualkubelet/config.go | 6 +++--- pkg/virtualkubelet/execute.go | 35 +++++++++++++++++------------------ 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/pkg/interlink/api/func.go b/pkg/interlink/api/func.go index 3c2aa490..1e9a4e5b 100644 --- a/pkg/interlink/api/func.go +++ b/pkg/interlink/api/func.go @@ -2,7 +2,6 @@ package api import ( "context" - "path/filepath" "strings" "sync" "time" @@ -139,8 +138,8 @@ func retrieveData(ctx context.Context, config types.Config, pod types.PodCreateR case vol.EmptyDir != nil: // Deprecated: EmptyDirs is useless at VK level. It should be moved to plugin level. - edPath := filepath.Join(config.DataRootFolder, pod.Pod.Namespace+"-"+string(pod.Pod.UID), "emptyDirs", vol.Name) - retrievedData.EmptyDirs = append(retrievedData.EmptyDirs, edPath) + // edPath := filepath.Join(config.DataRootFolder, pod.Pod.Namespace+"-"+string(pod.Pod.UID), "emptyDirs", vol.Name) + // retrievedData.EmptyDirs = append(retrievedData.EmptyDirs, edPath) default: log.G(ctx).Warning("ignoring unsupported volume type for ", mountVar.Name) diff --git a/pkg/virtualkubelet/config.go b/pkg/virtualkubelet/config.go index c9bb4d27..4f615e9b 100644 --- a/pkg/virtualkubelet/config.go +++ b/pkg/virtualkubelet/config.go @@ -4,9 +4,9 @@ package virtualkubelet type Config struct { InterlinkURL string `yaml:"InterlinkURL"` Interlinkport string `yaml:"InterlinkPort"` - KubernetesApiAddr string `yaml:"KubernetesApiAddr"` - KubernetesApiPort string `yaml:"KubernetesApiPort"` - KubernetesApiCaCrt string `yaml:"KubernetesApiCaCrt"` + KubernetesAPIAddr string `yaml:"KubernetesApiAddr"` + KubernetesAPIPort string `yaml:"KubernetesApiPort"` + KubernetesAPICaCrt string `yaml:"KubernetesApiCaCrt"` VKConfigPath string `yaml:"VKConfigPath"` VKTokenFile string `yaml:"VKTokenFile"` ServiceAccount string `yaml:"ServiceAccount"` diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 152ceece..8a740942 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -429,7 +429,7 @@ KUBERNETES_SERVICE_PORT_HTTPS=443 KUBERNETES_SERVICE_HOST=10.96.0.1 */ func addKubernetesServicesEnvVars(ctx context.Context, config Config, pod *v1.Pod) { - if config.KubernetesApiAddr == "" || config.KubernetesApiPort == "" { + if config.KubernetesAPIAddr == "" || config.KubernetesAPIPort == "" { log.G(ctx).Info("InterLink configuration does not contains both KubernetesApiAddr and KubernetesApiPort, so no env var like KUBERNETES_SERVICE_HOST is added.") return } @@ -443,23 +443,23 @@ func addKubernetesServicesEnvVars(ctx context.Context, config Config, pod *v1.Po } appendEnvVars := func(containersPtr *[]v1.Container, index int) { containers := *containersPtr - //container := containers[index] + // container := containers[index] envsPtr := &containers[index].Env - appendEnvVar(envsPtr, "KUBERNETES_PORT", "tcp://"+config.KubernetesApiAddr+":"+config.KubernetesApiPort) - appendEnvVar(envsPtr, "KUBERNETES_SERVICE_PORT", config.KubernetesApiPort) - appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_ADDR", config.KubernetesApiAddr) - appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_PORT", config.KubernetesApiPort) + appendEnvVar(envsPtr, "KUBERNETES_PORT", "tcp://"+config.KubernetesAPIAddr+":"+config.KubernetesAPIPort) + appendEnvVar(envsPtr, "KUBERNETES_SERVICE_PORT", config.KubernetesAPIPort) + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_ADDR", config.KubernetesAPIAddr) + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_PORT", config.KubernetesAPIPort) appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP_PROTO", "tcp") - appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP", "tcp://"+config.KubernetesApiAddr+":"+config.KubernetesApiPort) - appendEnvVar(envsPtr, "KUBERNETES_SERVICE_PORT_HTTPS", config.KubernetesApiPort) - appendEnvVar(envsPtr, "KUBERNETES_SERVICE_HOST", config.KubernetesApiAddr) + appendEnvVar(envsPtr, "KUBERNETES_PORT_443_TCP", "tcp://"+config.KubernetesAPIAddr+":"+config.KubernetesAPIPort) + appendEnvVar(envsPtr, "KUBERNETES_SERVICE_PORT_HTTPS", config.KubernetesAPIPort) + appendEnvVar(envsPtr, "KUBERNETES_SERVICE_HOST", config.KubernetesAPIAddr) } // Warning: loop range copy value, so to modify original containers, we must use index instead. - for i, _ := range pod.Spec.InitContainers { + for i := range pod.Spec.InitContainers { appendEnvVars(&pod.Spec.InitContainers, i) } - for i, _ := range pod.Spec.Containers { + for i := range pod.Spec.Containers { appendEnvVars(&pod.Spec.Containers, i) } @@ -477,7 +477,7 @@ func addKubernetesServicesEnvVars(ctx context.Context, config Config, pod *v1.Po } } log.G(ctx).Info("InterLink VK added a set of environment variables (e.g.: KUBERNETES_SERVICE_HOST) to all containers of pod ", - pod.Name, " k8s addr ", config.KubernetesApiAddr, " k8s port ", config.KubernetesApiPort) + pod.Name, " k8s addr ", config.KubernetesAPIAddr, " k8s port ", config.KubernetesAPIPort) } // Handle projected sources and fills the projectedVolume object. @@ -541,15 +541,15 @@ func remoteExecutionHandleProjectedSource( name: kube-root-ca.crt */ for _, item := range source.ConfigMap.Items { - KUBE_CA_CRT := "kube-root-ca.crt" - overrideCaCrt := p.config.KubernetesApiCaCrt - if source.ConfigMap.Name == KUBE_CA_CRT && overrideCaCrt != "" { + const kubeCaCrt = "kube-root-ca.crt" + overrideCaCrt := p.config.KubernetesAPICaCrt + if source.ConfigMap.Name == kubeCaCrt && overrideCaCrt != "" { log.G(ctx).Debug("handling special case of Kubernetes API kube-root-ca.crt, override found, using provided ca.crt:, ", overrideCaCrt) projectedVolume.Data[item.Path] = overrideCaCrt } else { // This gets the usual certificate for K8s API, but it is restricted to whatever usual IP/FQDN of K8S API URL. // With InterLink, the Kubernetes internal network is not accessible so this default ca.crt is probably useless. - log.G(ctx).Warning("using default Kubernetes API kube-root-ca.crt (no override found), but the default one might not be compatible with the subject: ", p.config.KubernetesApiAddr) + log.G(ctx).Warning("using default Kubernetes API kube-root-ca.crt (no override found), but the default one might not be compatible with the subject: ", p.config.KubernetesAPIAddr) cfgmap, err := p.clientSet.CoreV1().ConfigMaps(pod.Namespace).Get(ctx, source.ConfigMap.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("error during retrieval of ConfigMap %s error: %w", source.ConfigMap.Name, err) @@ -651,9 +651,8 @@ func remoteExecutionHandleVolumes(ctx context.Context, p *Provider, pod *v1.Pod, err := remoteExecutionHandleProjectedSource(ctx, p, pod, source, &projectedVolume) if err != nil { return err - } else { - failedAndWait = false } + failedAndWait = false log.G(ctx).Debug("ProjectedVolumeMaps len: ", len(req.ProjectedVolumeMaps)) } From fc0d29dd39974828b2732c5b67d94b201a9b5b10 Mon Sep 17 00:00:00 2001 From: antoinetran Date: Fri, 31 Jan 2025 15:02:04 +0100 Subject: [PATCH 4/5] fix lint --- pkg/interlink/api/func.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/interlink/api/func.go b/pkg/interlink/api/func.go index 1e9a4e5b..4bd7546b 100644 --- a/pkg/interlink/api/func.go +++ b/pkg/interlink/api/func.go @@ -73,7 +73,7 @@ func getData(ctx context.Context, config types.Config, pod types.PodCreateReques // retrieveData retrieves ConfigMaps, Secrets and EmptyDirs. // The config is needed to specify the EmptyDirs mounting point. // It returns the retrieved data in a variable of type commonIL.RetrievedContainer and the first encountered error. -func retrieveData(ctx context.Context, config types.Config, pod types.PodCreateRequests, container v1.Container) (types.RetrievedContainer, error) { +func retrieveData(ctx context.Context, _ types.Config, pod types.PodCreateRequests, container v1.Container) (types.RetrievedContainer, error) { retrievedData := types.RetrievedContainer{} retrievedData.Name = container.Name for _, mountVar := range container.VolumeMounts { From 154b6ced69dd0970565501aada63cc801eb159bd Mon Sep 17 00:00:00 2001 From: antoinetran Date: Mon, 3 Feb 2025 13:42:06 +0100 Subject: [PATCH 5/5] CI/CD improve go build cache for InterLink API docker --- docker/Dockerfile.interlink | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile.interlink b/docker/Dockerfile.interlink index 8731532e..d8343d04 100644 --- a/docker/Dockerfile.interlink +++ b/docker/Dockerfile.interlink @@ -13,8 +13,8 @@ RUN mkdir -p $GOMODCACHE && mkdir -p $GOCACHE ARG VERSION RUN bash -c "KUBELET_VERSION=${VERSION} ./cmd/virtual-kubelet/set-version.sh" -RUN go mod tidy -RUN CGO_ENABLED=0 GOOS=linux go build -o bin/interlink cmd/interlink/main.go +RUN --mount=type=cache,target=/go/pkg/mod bash -c "time go mod tidy" +RUN --mount=type=cache,target=/go/build-cache bash -c "time CGO_ENABLED=0 GOOS=linux go build -o bin/interlink cmd/interlink/main.go" # Deploy the application binary into a lean image FROM gcr.io/distroless/base-debian11:latest AS build-release-stage