Skip to content

Commit

Permalink
Move config update logic into pkg (just enough to compile and test ok)
Browse files Browse the repository at this point in the history
  • Loading branch information
uvegla committed Feb 6, 2025
1 parent 5a0eb00 commit fed8b05
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 292 deletions.
288 changes: 5 additions & 283 deletions cmd/kustomizepatch/runner.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package kustomizepatch

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/fluxcd/pkg/tar"
applicationv1alpha1 "github.com/giantswarm/apiextensions-application/api/v1alpha1"
"github.com/giantswarm/app/v7/pkg/app"
"github.com/giantswarm/microerror"
Expand All @@ -28,6 +21,7 @@ import (
kyaml "sigs.k8s.io/kustomize/kyaml/yaml"
"sigs.k8s.io/yaml"

cfg "github.com/giantswarm/konfigure/pkg/config"
"github.com/giantswarm/konfigure/pkg/meta"
"github.com/giantswarm/konfigure/pkg/service"
"github.com/giantswarm/konfigure/pkg/sopsenv/key"
Expand All @@ -40,9 +34,8 @@ const (

// cacheDir is a directory where konfigure will keep its cache if it's
// running in cluster and talking to source-controller.
cacheDir = "/tmp/konfigure-cache"
cacheLastArchive = "lastarchive"
cacheLastArchiveTimestamp = "lastarchivetimestamp"
cacheDir = "/tmp/konfigure-cache"

// dirEnvVar is a directory containing giantswarm/config. If set, requests
// to source-controller will not be made and both sourceServiceEnvVar and
// gitRepositoryEnvVar will be ignored. Used only on local machine for
Expand All @@ -51,29 +44,13 @@ const (
// installationEnvVar tells konfigure which installation it's running in,
// e.g. "ginger"
installationEnvVar = "KONFIGURE_INSTALLATION"
// gitRepositoryEnvVar is namespace/name of GitRepository pointing to
// giantswarm/config, e.g. "flux-system/gs-config"
gitRepositoryEnvVar = "KONFIGURE_GITREPO"
// kubernetesServiceEnvVar is K8S host of the Kubernetes API service.
kubernetesServiceHostEnvVar = "KUBERNETES_SERVICE_HOST"
// kubernetesServicePortEnvVar is K8S port of the Kubernetes API service.
kubernetesServicePortEnvVar = "KUBERNETES_SERVICE_PORT"
// kubernetesToken holds the location of the Kubernetes Service Account
// token mount within a Pod.
kubernetesTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" // #nosec G101

// sopsKeysDirEnvVar tells Konfigure how to configure environment to make
// it possible for SOPS to find the keys
sopsKeysDirEnvVar = "KONFIGURE_SOPS_KEYS_DIR"
// sopsKeysSourceEnvVar tells Konfigure to either get keys from Kubernetes
// Secrets or rely on local storage when setting up environment for SOPS
sopsKeysSourceEnvVar = "KONFIGURE_SOPS_KEYS_SOURCE"
// v1SourceAPIGroup holds Flux Source group and v1 version
v1SourceAPIGroup = "source.toolkit.fluxcd.io/v1"
// v1beta2SourceAPIGroup holds Flux Source group and v1beta2 version
v1beta2SourceAPIGroup = "source.toolkit.fluxcd.io/v1beta2"
// sourceServiceEnvVar is K8s address of source-controller's service, e.g.
// "source-controller.flux-system.svc"
sourceServiceEnvVar = "KONFIGURE_SOURCE_SERVICE"
)

type runner struct {
Expand Down Expand Up @@ -141,7 +118,7 @@ func (r *runner) run(items []*kyaml.RNode) ([]*kyaml.RNode, error) {
dir = os.Getenv(dirEnvVar)
// Else, we download the packaged config from source-controller.
if dir == "" {
if err := r.updateConfig(); err != nil {
if err := cfg.UpdateConfig(cacheDir); err != nil {
return nil, microerror.Mask(err)
}
dir = path.Join(cacheDir, "latest")
Expand Down Expand Up @@ -260,261 +237,6 @@ func (r *runner) run(items []*kyaml.RNode) ([]*kyaml.RNode, error) {
return output, nil
}

// updateConfig makes sure that the giantswarm/config version we keep stashed
// in <cacheDir>/latest is still *the* latest version out there. In order to do that,
// it sends a HEAD request for the last known artifact to the Source Controller,
// in order to check it is still available. If so, it then skips further processing.
// Otherwise it contacts the GitRepository resource for the new artifact's URL.
// The URL is then used to download a new version of the archive and untar it.
// The archive name is being saved for later comparison.
func (r *runner) updateConfig() error {
return r.updateConfigWithParams(cacheDir, kubernetesTokenFile)
}

func (r *runner) updateConfigWithParams(cache, token string) error {
// Get source-controller's service URL and GitRepository data from
// environment variables. We use this data to construct an URL to
// source-controller's artifact.
svc := os.Getenv(sourceServiceEnvVar)
if svc == "" {
return microerror.Maskf(executionFailedError, "%q environment variable not set", sourceServiceEnvVar)
}

repo := os.Getenv(gitRepositoryEnvVar)
if repo == "" {
return microerror.Maskf(executionFailedError, "%q environment variable not set", gitRepositoryEnvVar)
}

// We first get the 'lastarchive' file, because it contains the name of the artifact we have
// been using up until now. If the file is gone, it means we haven't populated the cache yet,
// hence we must do it now. If the file is present, but archive of the given name is no longer
// advertised by the Source Controller, we must look for a new one and re-populate the cache. If the
// file is present, and is still advertised by the Source Controller, all is good and we may return.
cachedArtifact, err := os.ReadFile(path.Join(cache, cacheLastArchive))
if err != nil && os.IsNotExist(err) {
cachedArtifact = []byte("placeholder.tar.gz")
} else if err != nil {
return microerror.Mask(err)
}

cachedArtifactTimestampByte, err := os.ReadFile(path.Join(cache, cacheLastArchiveTimestamp))
if err != nil && os.IsNotExist(err) {
cachedArtifactTimestampByte = []byte(time.Time{}.Format(http.TimeFormat))
} else if err != nil {
return microerror.Mask(err)
}

cachedArtifactTimestamp, err := time.Parse(http.TimeFormat, string(cachedArtifactTimestampByte))
if err != nil {
return microerror.Mask(err)
}

url := fmt.Sprintf("http://%s/gitrepository/%s/%s", svc, repo, string(cachedArtifact))
// Make a HEAD request to the Source Controller. This allows us to check if the artifact
// we have cached is still offered.
client := &http.Client{Timeout: 60 * time.Second}
request, err := http.NewRequest(http.MethodHead, url, nil)
if err != nil {
return microerror.Mask(err)
}

response, err := client.Do(request)
if err != nil {
return microerror.Mask(err)
}
defer response.Body.Close()

// The artifact we were asking for is still advertised by the Source Controller,
// and has not changed since the last time, hence we may skip further processing.
if response.StatusCode == http.StatusOK {
// The artifact we are asking for is still available, we need to check its
// last modification date
artifactTimestamp, err := time.Parse(http.TimeFormat, response.Header.Get("Last-Modified"))
if err != nil {
return microerror.Mask(err)
}

if cachedArtifactTimestamp.After(artifactTimestamp) || cachedArtifactTimestamp.Equal(artifactTimestamp) {
return nil
}
} else {
if response.StatusCode != http.StatusNotFound {
return microerror.Maskf(
executionFailedError,
"error calling %q: expected %d, got %d", request.URL, http.StatusNotFound, response.StatusCode,
)
} else {
url = ""
}
}

// When latest known revision is still available, there is no need to query the API Server
// for the GitRepository, it saves us one call.
if url == "" {
// The artifact we were asking for is gone, we must find the newly advertised one,
// hence we query the Kubernetes API Server for the GitRepository CR resource.
k8sApiHost := os.Getenv(kubernetesServiceHostEnvVar)
if svc == "" {
return microerror.Maskf(executionFailedError, "%q environment variable not set", kubernetesServiceHostEnvVar)
}

k8sApiPort := os.Getenv(kubernetesServicePortEnvVar)
if svc == "" {
return microerror.Maskf(executionFailedError, "%q environment variable not set", kubernetesServicePortEnvVar)
}

repoCoordinates := strings.Split(repo, "/")

k8sApiPath := []string{
fmt.Sprintf(
"https://%s:%s/apis/%s/namespaces/%s/gitrepositories/%s",
k8sApiHost,
k8sApiPort,
v1SourceAPIGroup,
repoCoordinates[0],
repoCoordinates[1],
),
fmt.Sprintf(
"https://%s:%s/apis/%s/namespaces/%s/gitrepositories/%s",
k8sApiHost,
k8sApiPort,
v1beta2SourceAPIGroup,
repoCoordinates[0],
repoCoordinates[1],
),
}

k8sToken, err := os.ReadFile(token)
if err != nil {
return microerror.Mask(err)
}

bearer := fmt.Sprintf("Bearer %s", strings.TrimSpace(string(k8sToken)))

// Make a GET request to the Kubernetes API server to get the GitRepository
// in a JSON format.
for _, p := range k8sApiPath {
request, err = http.NewRequest(http.MethodGet, p, nil)
if err != nil {
return microerror.Mask(err)
}

request.Header.Set("Authorization", bearer)
request.Header.Add("Accept", "application/json")

tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}} // nolint:gosec
client.Transport = tr

response, err = client.Do(request)
if err != nil {
return microerror.Mask(err)
}
defer response.Body.Close()

if response.StatusCode == http.StatusOK {
break
}

if response.StatusCode == http.StatusNotFound {
continue
}

return microerror.Maskf(
executionFailedError,
"error calling %q: expected %d, got %d", request.URL, http.StatusOK, response.StatusCode,
)
}

if response.StatusCode != http.StatusOK {
return microerror.Maskf(
executionFailedError,
"error getting '%s' GitRepository CR", repo,
)
}

responseBytes, err := io.ReadAll(response.Body)
if err != nil {
return microerror.Mask(err)
}

// We are not interested in an entire object, we are only interested in getting
// some of the status fields that advertise the new archive.
type gitRepository struct {
Status struct {
Artifact struct {
Url string
}
}
}

var gr gitRepository
err = json.Unmarshal(responseBytes, &gr)
if err != nil {
return microerror.Mask(err)
}

// Note: technically this does not mean an error. An empty field could be a symptom
// of the CR still being reconciled, or not being picked up by the Source Controller
// at all, in which case, we could simply skip quietly.
if gr.Status.Artifact.Url == "" {
return microerror.Maskf(
executionFailedError,
"error downloading artifact: got empty URL from GitRepository status",
)
}

url = gr.Status.Artifact.Url
}

request, err = http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return microerror.Mask(err)
}

response, err = client.Do(request)
if err != nil {
return microerror.Mask(err)
}
if response.StatusCode != http.StatusOK {
return microerror.Maskf(
executionFailedError,
"error calling %q: expected %d, got %d", request.URL, http.StatusOK, response.StatusCode,
)
}
defer response.Body.Close()

var buf bytes.Buffer
_, err = io.Copy(&buf, response.Body)
if err != nil {
return microerror.Mask(err)
}

// Clear the old artifact's directory and untar a fresh one.
dir := path.Join(cache, "latest")
if err := os.RemoveAll(dir); err != nil {
return microerror.Mask(err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
return microerror.Mask(err)
}
if err = tar.Untar(&buf, dir); err != nil {
return microerror.Mask(err)
}

// Update the last archive name and timestamp
err = os.WriteFile(path.Join(cache, cacheLastArchive), []byte(filepath.Base(url)), 0755) // nolint:gosec
if err != nil {
return microerror.Mask(err)
}

err = os.WriteFile(path.Join(cache, cacheLastArchiveTimestamp), []byte(response.Header.Get("Last-Modified")), 0755) // nolint:gosec
if err != nil {
return microerror.Mask(err)
}

return nil
}

func addNameSuffix(name string) string {
if len(name) >= 63-len(nameSuffix)-1 {
name = name[:63-len(nameSuffix)-1]
Expand Down
Loading

0 comments on commit fed8b05

Please sign in to comment.