Skip to content

Commit 445ac85

Browse files
committed
Support remote clusters and arbitrary types
This commit adds the infrastructure for watching and querying arbitrarily-typed pipeline targets, in remote clusters as well as the local cluster. The basic shape is this: for each target that needs to be examined, the reconciler uses `watchTargetAndGetReader(..., target)`. This procedure encapsulates the detail of making sure there's a cache for the target's cluster and type, and supplies the client.Reader needed for fetching the target object. A `cache.Cache` is kept for each {cluster, type}. `cache.Cache` is the smallest piece of machinery that can be torn down, because the next layer down, `Informer` objects, can't be removed once created. This is important for being able to stop watching targets when they are no longer targets. Target object updates will come from all the caches, which come and (in principle) go; but, the handler must be statically installed in SetupWithManager(). So, targets are looked up in an index to get the corresponding pipeline (if there is one), and that pipeline is put into a `source.Channel`. The channel source multiplexes the dynamic event handlers into a static pipeline requeue handler. NB: * I've put the remote cluster test in its own Test* wrapper, because it needs to start another testenv to be the remote cluster. * Supporting arbitrary types means using `unstructured.Unstructured` when querying for target objects, and this complicates checking their status. Since the caches are per-type, in theory there could be code for uerying known types (HelmRelease and Kustomize), with `Unstructured` as a fallback. So long at the object passed to `watchTargetAndGetReader(...) is the same one used with client.Get(...), it should all work. * A cache per {cluster, type} is not the only possible scheme. The watching could be more precise -- meaning fewer spurious events, and narrower permissions needed -- by having a cache per {cluster, namespace, type}, with the trade-off being managing more goroutines, and other overheads. I've chosen the chunkier scheme based on an informed guess that it'll be more efficient for low numbers of clusters and targets.
1 parent b4d8755 commit 445ac85

File tree

10 files changed

+2351
-126
lines changed

10 files changed

+2351
-126
lines changed

config/testdata/crds/kustomization.yaml

+1,629
Large diffs are not rendered by default.

controllers/leveltriggered/controller.go

+234-29
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,42 @@ package leveltriggered
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

7-
helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
88
clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1"
99
corev1 "k8s.io/api/core/v1"
1010
apierrors "k8s.io/apimachinery/pkg/api/errors"
1111
apimeta "k8s.io/apimachinery/pkg/api/meta"
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1314
"k8s.io/apimachinery/pkg/runtime"
15+
"k8s.io/apimachinery/pkg/runtime/schema"
1416
"k8s.io/apimachinery/pkg/types"
17+
"k8s.io/client-go/rest"
18+
toolscache "k8s.io/client-go/tools/cache"
19+
"k8s.io/client-go/tools/clientcmd"
1520
"k8s.io/client-go/tools/record"
21+
capicfg "sigs.k8s.io/cluster-api/util/kubeconfig"
1622
ctrl "sigs.k8s.io/controller-runtime"
23+
"sigs.k8s.io/controller-runtime/pkg/cache"
1724
"sigs.k8s.io/controller-runtime/pkg/client"
25+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
26+
"sigs.k8s.io/controller-runtime/pkg/event"
1827
"sigs.k8s.io/controller-runtime/pkg/handler"
1928
"sigs.k8s.io/controller-runtime/pkg/log"
29+
"sigs.k8s.io/controller-runtime/pkg/source"
2030

2131
"github.com/weaveworks/pipeline-controller/api/v1alpha1"
2232
"github.com/weaveworks/pipeline-controller/pkg/conditions"
2333
"github.com/weaveworks/pipeline-controller/server/strategy"
2434
)
2535

36+
// clusterAndGVK is used as the key for caches
37+
type clusterAndGVK struct {
38+
client.ObjectKey
39+
schema.GroupVersionKind
40+
}
41+
2642
// PipelineReconciler reconciles a Pipeline object
2743
type PipelineReconciler struct {
2844
client.Client
@@ -31,6 +47,11 @@ type PipelineReconciler struct {
3147
ControllerName string
3248
recorder record.EventRecorder
3349
stratReg strategy.StrategyRegistry
50+
51+
caches map[clusterAndGVK]cache.Cache
52+
cachesMu *sync.Mutex
53+
manager ctrl.Manager
54+
appEvents chan event.GenericEvent
3455
}
3556

3657
func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string, eventRecorder record.EventRecorder, stratReg strategy.StrategyRegistry) *PipelineReconciler {
@@ -43,6 +64,9 @@ func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName st
4364
recorder: eventRecorder,
4465
ControllerName: controllerName,
4566
stratReg: stratReg,
67+
caches: make(map[clusterAndGVK]cache.Cache),
68+
cachesMu: &sync.Mutex{},
69+
appEvents: make(chan event.GenericEvent),
4670
}
4771
}
4872

@@ -120,22 +144,33 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
120144
}
121145
}
122146

123-
// it's OK if this is `nil` -- that represents the local cluster.
124-
clusterClient, err := r.getClusterClient(clusterObject)
147+
targetObj, err := targetObject(&pipeline, &target)
148+
if err != nil {
149+
targetStatus.Error = fmt.Sprintf("target spec could not be interpreted as an object: %s", err.Error())
150+
unready = true
151+
continue
152+
}
153+
154+
// it's OK if clusterObject is still `nil` -- that represents the local cluster.
155+
clusterClient, ok, err := r.watchTargetAndGetReader(ctx, clusterObject, targetObj)
125156
if err != nil {
126157
return ctrl.Result{}, err
127158
}
159+
if !ok {
160+
targetStatus.Error = "Target cluster client is not synced"
161+
unready = true
162+
continue
163+
}
128164

165+
targetKey := client.ObjectKeyFromObject(targetObj)
129166
// look up the actual application
130-
var app helmv2.HelmRelease // FIXME this can be other kinds!
131-
appKey := targetObjectKey(&pipeline, &target)
132-
err = clusterClient.Get(ctx, appKey, &app)
167+
err = clusterClient.Get(ctx, targetKey, targetObj)
133168
if err != nil {
134169
r.emitEventf(
135170
&pipeline,
136171
corev1.EventTypeWarning,
137172
"GetAppError", "Failed to get application object %s%s/%s for pipeline %s/%s: %s",
138-
clusterPrefix(target.ClusterRef), appKey.Namespace, appKey.Name,
173+
clusterPrefix(target.ClusterRef), targetKey.Namespace, targetKey.Name,
139174
pipeline.GetNamespace(), pipeline.GetName(),
140175
err,
141176
)
@@ -144,7 +179,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
144179
unready = true
145180
continue
146181
}
147-
setTargetStatus(targetStatus, &app)
182+
setTargetStatus(targetStatus, targetObj)
148183
}
149184
}
150185

@@ -331,35 +366,206 @@ func checkAllTargetsAreReady(env *v1alpha1.EnvironmentStatus) bool {
331366
return true
332367
}
333368

334-
// getClusterClient retrieves or creates a client for the cluster in question. A `nil` value for the argument indicates the local cluster.
335-
func (r *PipelineReconciler) getClusterClient(cluster *clusterctrlv1alpha1.GitopsCluster) (client.Client, error) {
336-
if cluster == nil {
337-
return r.Client, nil
369+
// watchTargetAndGetReader ensures that the type (GroupVersionKind) of the target in the cluster given is being watched, and
370+
// returns a client.Reader.
371+
// A nil for the `clusterObject` argument indicates the local cluster. It returns an error if there was a problem, and otherwise
372+
// a client.Reader and a bool which is true if the type was already watched and syncing.
373+
//
374+
// NB: the target object should be the object that is used as the `client.Object` argument for client.Get(...); the cache has different
375+
// stores for {typed values, partial values, unstructured values}, and we want to install the watcher in the same one as we will later query.
376+
func (r *PipelineReconciler) watchTargetAndGetReader(ctx context.Context, clusterObject *clusterctrlv1alpha1.GitopsCluster, target client.Object) (client.Reader, bool, error) {
377+
var clusterKey client.ObjectKey
378+
if clusterObject != nil {
379+
clusterKey = client.ObjectKeyFromObject(clusterObject)
338380
}
339-
// TODO future: get the secret via the cluster object, connect to remote cluster
340-
return nil, fmt.Errorf("remote clusters not supported yet")
341-
}
342381

343-
// targetObjectKey returns the object key (namespaced name) for a target. The Pipeline is passed in as well as the Target, since the definition can be spread between these specs.
344-
func targetObjectKey(pipeline *v1alpha1.Pipeline, target *v1alpha1.Target) client.ObjectKey {
345-
key := client.ObjectKey{}
346-
key.Name = pipeline.Spec.AppRef.Name
347-
key.Namespace = target.Namespace
348-
return key
382+
targetGVK, err := apiutil.GVKForObject(target, r.targetScheme)
383+
if err != nil {
384+
return nil, false, err
385+
}
386+
cacheKey := clusterAndGVK{
387+
ObjectKey: clusterKey,
388+
GroupVersionKind: targetGVK,
389+
}
390+
391+
logger := log.FromContext(ctx).WithValues("component", "target cache", "cluster", clusterKey, "name", client.ObjectKeyFromObject(target), "type", targetGVK)
392+
393+
r.cachesMu.Lock()
394+
typeCache, cacheFound := r.caches[cacheKey]
395+
r.cachesMu.Unlock()
396+
// To construct a cache, we need a *rest.Config. There's two ways to get one:
397+
// - for the local cluster, we can just get the already prepared one from the Manager;
398+
// - for a remote cluster, we can construct one given a kubeconfig; and the kubeconfig will be stored in a Secret,
399+
// associated with the object representing the cluster.
400+
if !cacheFound {
401+
logger.Info("creating cache for cluster and type", "cluster", clusterKey, "type", targetGVK)
402+
var cfg *rest.Config
403+
404+
if clusterObject == nil {
405+
cfg = r.manager.GetConfig()
406+
} else {
407+
var kubeconfig []byte
408+
var err error
409+
switch {
410+
case clusterObject.Spec.CAPIClusterRef != nil:
411+
var capiKey client.ObjectKey
412+
capiKey.Name = clusterObject.Spec.CAPIClusterRef.Name
413+
capiKey.Namespace = clusterObject.GetNamespace()
414+
kubeconfig, err = capicfg.FromSecret(ctx, r.Client, capiKey)
415+
if err != nil {
416+
return nil, false, err
417+
}
418+
case clusterObject.Spec.SecretRef != nil:
419+
var secretKey client.ObjectKey
420+
secretKey.Name = clusterObject.Spec.SecretRef.Name
421+
secretKey.Namespace = clusterObject.GetNamespace()
422+
423+
var sec corev1.Secret
424+
if err := r.Get(ctx, secretKey, &sec); err != nil {
425+
return nil, false, err
426+
}
427+
var ok bool
428+
kubeconfig, ok = sec.Data["kubeconfig"]
429+
if !ok {
430+
return nil, false, fmt.Errorf("referenced Secret does not have data key %s", "kubeconfig")
431+
}
432+
default:
433+
return nil, false, fmt.Errorf("GitopsCluster object has neither .secretRef nor .capiClusterRef populated, unable to get remote cluster config")
434+
}
435+
cfg, err = clientcmd.RESTConfigFromKubeConfig(kubeconfig)
436+
if err != nil {
437+
return nil, false, err
438+
}
439+
}
440+
441+
// having done all that, did we really need it?
442+
r.cachesMu.Lock()
443+
if typeCache, cacheFound = r.caches[cacheKey]; !cacheFound {
444+
c, err := cache.New(cfg, cache.Options{
445+
Scheme: r.targetScheme,
446+
})
447+
if err != nil {
448+
r.cachesMu.Unlock()
449+
return nil, false, err
450+
}
451+
452+
// this must be done with the lock held, because if it fails, we can't use the cache and shouldn't add it to the map.
453+
if err := r.manager.Add(c); err != nil { // this will start it asynchronously
454+
r.cachesMu.Unlock()
455+
return nil, false, err
456+
}
457+
458+
typeCache = c
459+
r.caches[cacheKey] = typeCache
460+
}
461+
r.cachesMu.Unlock()
462+
}
463+
464+
// Now we have a cache; make sure the object type in question is being watched, so we can query it and get updates.
465+
466+
// The informer is retrieved whether we created the cache or not, because we want to know if it's synced and thus ready to be queried.
467+
inf, err := typeCache.GetInformer(ctx, target) // NB not InformerForKind(...), because that uses the typed value cache specifically (see the method comment).
468+
if err != nil {
469+
return nil, false, err
470+
}
471+
472+
if !cacheFound { // meaning: we created the cache, this time around
473+
enqueuePipelinesForTarget := func(obj interface{}) {
474+
eventObj, ok := obj.(client.Object)
475+
if !ok {
476+
logger.Info("value to look up in index was not a client.Object", "object", eventObj)
477+
return
478+
}
479+
pipelines, err := r.pipelinesForApplication(clusterKey, eventObj)
480+
if err != nil {
481+
logger.Error(err, "failed to look up pipelines in index of applications")
482+
return
483+
}
484+
// TODO is passing pointers here dangerous? (do they get copied, I think they might do). Alternative is to pass the whole list in the channel.
485+
for i := range pipelines {
486+
r.appEvents <- event.GenericEvent{Object: &pipelines[i]}
487+
}
488+
}
489+
490+
_, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
491+
AddFunc: func(obj interface{}) {
492+
enqueuePipelinesForTarget(obj)
493+
},
494+
DeleteFunc: func(obj interface{}) {
495+
enqueuePipelinesForTarget(obj)
496+
},
497+
UpdateFunc: func(oldObj, newObj interface{}) {
498+
// We're just looking up the name in the index so it'll be the same for the old as for the new object.
499+
// However, this might change elsewhere, so to be defensive, run both. The queue will deduplicate,
500+
// though it means we do a bit more lookup work.
501+
enqueuePipelinesForTarget(oldObj)
502+
enqueuePipelinesForTarget(newObj)
503+
},
504+
})
505+
if err != nil {
506+
return nil, false, err
507+
}
508+
}
509+
510+
return typeCache, inf.HasSynced(), nil
349511
}
350512

351-
// clusterPrefix returns a string naming the cluster containing an app, to prepend to the usual namespace/name format of the app object itself. So that it can be empty, the separator is include in the return value.
513+
// clusterPrefix returns a string naming the cluster containing an app, to prepend to the usual namespace/name format of the app object itself.
514+
// So that it can be empty, the separator is include in the return value.
352515
func clusterPrefix(ref *v1alpha1.CrossNamespaceClusterReference) string {
353516
if ref == nil {
354517
return ""
355518
}
356519
return fmt.Sprintf("%s/%s:", ref.Namespace, ref.Name)
357520
}
358521

522+
// targetObject returns a target object for a target spec, ready to be queried. The Pipeline is passed in as well as the Target,
523+
// since the definition can be spread between these specs. This is coupled with setTargetStatus because the concrete types returned here
524+
// must be handled by setTargetStatus.
525+
func targetObject(pipeline *v1alpha1.Pipeline, target *v1alpha1.Target) (client.Object, error) {
526+
var obj unstructured.Unstructured
527+
gv, err := schema.ParseGroupVersion(pipeline.Spec.AppRef.APIVersion)
528+
if err != nil {
529+
return nil, err
530+
}
531+
gvk := gv.WithKind(pipeline.Spec.AppRef.Kind)
532+
obj.GetObjectKind().SetGroupVersionKind(gvk)
533+
obj.SetName(pipeline.Spec.AppRef.Name)
534+
obj.SetNamespace(target.Namespace)
535+
return &obj, nil
536+
}
537+
359538
// setTargetStatus gets the relevant status from the app object given, and records it in the TargetStatus.
360-
func setTargetStatus(status *v1alpha1.TargetStatus, target *helmv2.HelmRelease) {
361-
status.Revision = target.Status.LastAppliedRevision
362-
status.Ready = conditions.IsReady(target.Status.Conditions)
539+
func setTargetStatus(status *v1alpha1.TargetStatus, targetObject client.Object) {
540+
switch obj := targetObject.(type) {
541+
case *unstructured.Unstructured:
542+
// this assumes it's a Flux-like object; specifically with
543+
// - a Ready condition
544+
// - a .status.lastAppliedRevision
545+
conds, ok, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
546+
if !ok || err != nil {
547+
status.Ready = false
548+
return
549+
}
550+
status.Ready = conditions.IsReadyUnstructured(conds)
551+
552+
lastAppliedRev, ok, err := unstructured.NestedString(obj.Object, "status", "lastAppliedRevision")
553+
if !ok || err != nil {
554+
// It's not an error to lack a Ready condition (new objects will lack any conditions), and it's not an error to lack a lastAppliedRevision
555+
// (maybe it hasn't got that far yet); but it is an error to have a ready condition of true and lack a lastAppliedRevision, since that means
556+
// the object is not a usable target.
557+
if status.Ready {
558+
status.Error = "unable to find .status.lastAppliedRevision in ready target object"
559+
status.Ready = false
560+
return
561+
}
562+
} else {
563+
status.Revision = lastAppliedRev
564+
}
565+
default:
566+
status.Error = "unable to determine ready status for object"
567+
status.Ready = false
568+
}
363569
}
364570

365571
func (r *PipelineReconciler) patchStatus(ctx context.Context, n types.NamespacedName, newStatus v1alpha1.PipelineStatus) error {
@@ -387,6 +593,8 @@ func (r *PipelineReconciler) getCluster(ctx context.Context, p v1alpha1.Pipeline
387593

388594
// SetupWithManager sets up the controller with the Manager.
389595
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
596+
r.manager = mgr
597+
390598
const (
391599
gitopsClusterIndexKey string = ".spec.environment.ClusterRef" // this is arbitrary, but let's make it suggest what it's indexing.
392600
)
@@ -411,10 +619,7 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
411619
&clusterctrlv1alpha1.GitopsCluster{},
412620
handler.EnqueueRequestsFromMapFunc(r.requestsForCluster(gitopsClusterIndexKey)),
413621
).
414-
Watches(
415-
&helmv2.HelmRelease{},
416-
handler.EnqueueRequestsFromMapFunc(r.requestsForApplication),
417-
).
622+
WatchesRawSource(&source.Channel{Source: r.appEvents}, &handler.EnqueueRequestForObject{}).
418623
Complete(r)
419624
}
420625

0 commit comments

Comments
 (0)