Skip to content

Commit

Permalink
neonvm: continuous synchronisation of files in pod/vm (#1222)
Browse files Browse the repository at this point in the history
For #1213 I need the renewed certificate secret to be updated within the
vm.

It's ok if there's some lag on this synchronisation as certificates are
renewed with generous amount of time left until expiry.

Design:
neonvm-runner will poll the files every 30s and compare a checksum against the files in the VM, via neonvm-daemon.
If the checksums do not match, they are sent to neonvm-daemon where neonvm-daemon utilises atomic-writer to update the files.

Useful links:
https://ahmet.im/blog/kubernetes-secret-volumes-delay/
https://ahmet.im/blog/kubernetes-inotify/

Some discussion here:
https://neondb.slack.com/archives/C03TN5G758R/p1737723555448339
https://neondb.slack.com/archives/C03TN5G758R/p1738669103059979
  • Loading branch information
conradludgate authored Feb 13, 2025
1 parent 5bd47d2 commit 14bfadb
Show file tree
Hide file tree
Showing 11 changed files with 478 additions and 6 deletions.
124 changes: 118 additions & 6 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package main

import (
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"go.uber.org/zap"

k8sutil "k8s.io/kubernetes/pkg/volume/util"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling"
"github.com/neondatabase/autoscaling/pkg/util"
)

func main() {
Expand All @@ -33,19 +39,21 @@ func main() {

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
srv := cpuServer{
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
logger: logger.Named("cpu-srv"),
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
fileOperationsMutex: &sync.Mutex{},
logger: logger.Named("cpu-srv"),
}
srv.run(*addr)
}

type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
logger *zap.Logger
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
fileOperationsMutex *sync.Mutex
logger *zap.Logger
}

func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
Expand Down Expand Up @@ -94,6 +102,97 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) handleGetFileChecksum(w http.ResponseWriter, r *http.Request, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

if err := r.Context().Err(); err != nil {
w.WriteHeader(http.StatusRequestTimeout)
return
}

dir := filepath.Join(path, "..data")
checksum, err := util.ChecksumFlatDir(dir)
if err != nil {
s.logger.Error("could not checksum dir", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte(checksum)); err != nil {
s.logger.Error("could not write response", zap.Error(err))
}
}

type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func (s *cpuServer) handleUploadFile(w http.ResponseWriter, r *http.Request, path string) {
s.fileOperationsMutex.Lock()
defer s.fileOperationsMutex.Unlock()

if err := r.Context().Err(); err != nil {
w.WriteHeader(http.StatusRequestTimeout)
return
}

if r.Body == nil {
s.logger.Error("no body")
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

var files map[string]File
if err := json.Unmarshal(body, &files); err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

payload := make(map[string]k8sutil.FileProjection)
for k, v := range files {
data, err := base64.StdEncoding.DecodeString(v.Data)
if err != nil {
s.logger.Error("could not ready body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
payload[k] = k8sutil.FileProjection{
Data: data,
// read-write by root
// read-only otherwise
Mode: 0o644,
FsUser: nil,
}
}

aw, err := k8sutil.NewAtomicWriter(path, "neonvm-daemon")
if err != nil {
s.logger.Error("could not create writer", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

if err := aw.Write(payload, nil); err != nil {
s.logger.Error("could not create files", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) run(addr string) {
mux := http.NewServeMux()
mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -108,6 +207,19 @@ func (s *cpuServer) run(addr string) {
w.WriteHeader(http.StatusNotFound)
}
})
mux.HandleFunc("/files/{path...}", func(w http.ResponseWriter, r *http.Request) {
path := fmt.Sprintf("/%s", r.PathValue("path"))
if r.Method == http.MethodGet {
s.handleGetFileChecksum(w, r, path)
return
} else if r.Method == http.MethodPut {
s.handleUploadFile(w, r, path)
return
} else {
// unknown method
w.WriteHeader(http.StatusNotFound)
}
})

timeout := 5 * time.Second
server := http.Server{
Expand Down
4 changes: 4 additions & 0 deletions neonvm-runner/cmd/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func createISO9660runtime(
if disk.MountPath != "" {
mounts = append(mounts, fmt.Sprintf(`/neonvm/bin/mkdir -p %s`, disk.MountPath))
}
if disk.Watch != nil && *disk.Watch {
// do nothing as we will mount it into the VM via neonvm-daemon later
continue
}
switch {
case disk.EmptyDisk != nil:
opts := ""
Expand Down
173 changes: 173 additions & 0 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

Expand Down Expand Up @@ -523,6 +524,8 @@ func runQEMU(
go listenForHTTPRequests(ctx, logger, vmSpec.RunnerPort, callbacks, &wg, monitoring)
wg.Add(1)
go forwardLogs(ctx, logger, &wg)
wg.Add(1)
go monitorFiles(ctx, logger, &wg, vmSpec.Disks)

qemuBin := getQemuBinaryName(cfg.architecture)
var bin string
Expand Down Expand Up @@ -684,6 +687,87 @@ func forwardLogs(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
}
}

// monitorFiles watches a specific set of files and copied them into the guest VM via neonvm-daemon.
func monitorFiles(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, disks []vmv1.Disk) {
defer wg.Done()

secrets := make(map[string]string)
secretsOrd := []string{}
for _, disk := range disks {
if disk.Watch != nil && *disk.Watch {
// secrets/configmaps are mounted using the atomicwriter utility,
// which loads the directory into `..data`.
dataDir := fmt.Sprintf("/vm/mounts%s/..data", disk.MountPath)
secrets[dataDir] = disk.MountPath
secretsOrd = append(secretsOrd, dataDir)
}
}

if len(secretsOrd) == 0 {
return
}

// Faster loop for the initial upload.
// The VM might need the secrets in order for postgres to actually start up,
// so it's important we sync them as soon as the daemon is available.
for {
success := true
for _, hostpath := range secretsOrd {
guestpath := secrets[hostpath]
if err := sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
success = false
logger.Error("failed to upload file to vm guest", zap.Error(err))
}
}
if success {
break
}

select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
return
}
}

// For the entire duration the VM is alive, periodically check whether any of the watched disks
// still match what's inside the VM, and if not, send the update.
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// for each secret we are tracking
for hostpath, guestpath := range secrets {
// get the checksum for the pod directory
hostsum, err := util.ChecksumFlatDir(hostpath)
if err != nil {
logger.Error("failed to get dir checksum from host", zap.Error(err), zap.String("dir", hostpath))
continue
}

// get the checksum for the VM directory
guestsum, err := getFileChecksumFromNeonvmDaemon(ctx, guestpath)
if err != nil {
logger.Error("failed to get dir checksum from guest", zap.Error(err), zap.String("dir", guestpath))
continue
}

// if not equal, update the files inside the VM.
if guestsum != hostsum {
if err = sendFilesToNeonvmDaemon(ctx, hostpath, guestpath); err != nil {
logger.Error("failed to upload files to vm guest", zap.Error(err))
}
}
}
}
}
}

func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
logger = logger.Named("terminate-qemu-on-sigterm")

Expand Down Expand Up @@ -796,3 +880,92 @@ func checkNeonvmDaemonCPU() error {
}
return nil
}

type File struct {
// base64 encoded file contents
Data string `json:"data"`
}

func sendFilesToNeonvmDaemon(ctx context.Context, hostpath, guestpath string) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

files, err := util.ReadAllFiles(hostpath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("could not open file: %w", err)
}

encodedFiles := make(map[string]File)
for k, v := range files {
encodedFiles[k] = File{Data: base64.StdEncoding.EncodeToString(v)}
}
body, err := json.Marshal(encodedFiles)
if err != nil {
return fmt.Errorf("could not encode files: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// guestpath has a leading forward slash
url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath)

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

return nil
}

func getFileChecksumFromNeonvmDaemon(ctx context.Context, guestpath string) (string, error) {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return "", fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

// guestpath has a leading forward slash
url := fmt.Sprintf("http://%s:25183/files%s", vmIP, guestpath)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return "", fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return "", fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

checksum, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("could not read response: %w", err)
}

return string(checksum), nil
}
6 changes: 6 additions & 0 deletions neonvm/apis/neonvm/v1/virtualmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,12 @@ type Disk struct {
// Path within the virtual machine at which the disk should be mounted. Must
// not contain ':'.
MountPath string `json:"mountPath"`
// The disk source is monitored for changes if true, otherwise it is only read on VM startup (false or unspecified).
// This only works if the disk source is a configmap, a secret, or a projected volume.
// Defaults to false.
// +optional
// +kubebuilder:default:=false
Watch *bool `json:"watch,omitempty"`
// DiskSource represents the location and type of the mounted disk.
DiskSource `json:",inline"`
}
Expand Down
Loading

0 comments on commit 14bfadb

Please sign in to comment.