Skip to content

Commit

Permalink
Leaderelection for clusterstatus
Browse files Browse the repository at this point in the history
  • Loading branch information
p-se committed Feb 20, 2025
1 parent ff011df commit f9aea73
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 62 deletions.
171 changes: 115 additions & 56 deletions internal/cmd/agent/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package agent
import (
"context"
"fmt"
glog "log"
"os"
"time"

"github.com/spf13/cobra"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
memory "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -87,75 +94,127 @@ func (cs *ClusterStatus) Run(cmd *cobra.Command, args []string) error {
localConfig := rest.CopyConfig(kc)
localConfig.RateLimiter = ratelimit.None

// cannot start without kubeconfig for upstream cluster
setupLog.Info("Fetching kubeconfig for upstream cluster from registration", "namespace", cs.Namespace)
agentInfo, _, err := register.Get(ctx, cs.Namespace, localConfig)
if err != nil {
setupLog.Error(err, "failed to get kubeconfig from upstream cluster")
return err
}

// set up factory for upstream cluster
fleetNamespace, _, err := agentInfo.ClientConfig.Namespace()
if err != nil {
setupLog.Error(err, "failed to get namespace from upstream cluster")
return err
}

fleetRESTConfig, err := agentInfo.ClientConfig.ClientConfig()
localClient, err := kubernetes.NewForConfig(kc)
if err != nil {
setupLog.Error(err, "failed to get kubeconfig from upstream cluster")
return err
}

// now we have both configs
fleetMapper, mapper, _, err := newMappers(ctx, fleetRESTConfig, clientConfig)
if err != nil {
setupLog.Error(err, "failed to get mappers")
return err
return fmt.Errorf("failed to create local client: %w", err)
}

fleetSharedFactory, err := newSharedControllerFactory(fleetRESTConfig, fleetMapper, fleetNamespace)
identifier, err := getUniqueIdentifier()
if err != nil {
setupLog.Error(err, "failed to build shared controller factory")
return err
return fmt.Errorf("failed to get unique identifier: %w", err)
}

fleetFactory, err := fleet.NewFactoryFromConfigWithOptions(fleetRESTConfig, &fleet.FactoryOptions{
SharedControllerFactory: fleetSharedFactory,
})
if err != nil {
setupLog.Error(err, "failed to build fleet factory")
return err
lock := resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockNameClusterStatus,
Namespace: cs.Namespace,
},
Client: localClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: identifier,
},
}

// set up factory for local cluster
localFactory, err := newSharedControllerFactory(localConfig, mapper, "")
leaderOpts, err := command.NewLeaderElectionOptions()
if err != nil {
setupLog.Error(err, "failed to build shared controller factory")
return err
}

coreFactory, err := core.NewFactoryFromConfigWithOptions(localConfig, &core.FactoryOptions{
SharedControllerFactory: localFactory,
})
if err != nil {
setupLog.Error(err, "failed to build core factory")
return err
glog.Println("leaderOpts", leaderOpts)

leaderElectionConfig := leaderelection.LeaderElectionConfig{
Lock: &lock,
LeaseDuration: *leaderOpts.LeaseDuration,
RetryPeriod: *leaderOpts.RetryPeriod,
RenewDeadline: *leaderOpts.RenewDeadline,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// cannot start without kubeconfig for upstream cluster
setupLog.Info("Fetching kubeconfig for upstream cluster from registration", "namespace", cs.Namespace)
agentInfo, _, err := register.Get(ctx, cs.Namespace, localConfig)
if err != nil {
setupLog.Error(err, "failed to get kubeconfig from upstream cluster")
return
}

// set up factory for upstream cluster
fleetNamespace, _, err := agentInfo.ClientConfig.Namespace()
if err != nil {
setupLog.Error(err, "failed to get namespace from upstream cluster")
return
}

fleetRESTConfig, err := agentInfo.ClientConfig.ClientConfig()
if err != nil {
setupLog.Error(err, "failed to get kubeconfig from upstream cluster")
return
}

// now we have both configs
fleetMapper, mapper, _, err := newMappers(ctx, fleetRESTConfig, clientConfig)
if err != nil {
setupLog.Error(err, "failed to get mappers")
return
}

fleetSharedFactory, err := newSharedControllerFactory(fleetRESTConfig, fleetMapper, fleetNamespace)
if err != nil {
setupLog.Error(err, "failed to build shared controller factory")
return
}

fleetFactory, err := fleet.NewFactoryFromConfigWithOptions(fleetRESTConfig, &fleet.FactoryOptions{
SharedControllerFactory: fleetSharedFactory,
})
if err != nil {
setupLog.Error(err, "failed to build fleet factory")
return
}

// set up factory for local cluster
localFactory, err := newSharedControllerFactory(localConfig, mapper, "")
if err != nil {
setupLog.Error(err, "failed to build shared controller factory")
return
}

coreFactory, err := core.NewFactoryFromConfigWithOptions(localConfig, &core.FactoryOptions{
SharedControllerFactory: localFactory,
})
if err != nil {
setupLog.Error(err, "failed to build core factory")
return
}

setupLog.Info("Starting cluster status ticker", "checkin interval", checkinInterval.String(), "cluster namespace", agentInfo.ClusterNamespace, "cluster name", agentInfo.ClusterName)

go func() {
clusterstatus.Ticker(ctx,
cs.Namespace,
agentInfo.ClusterNamespace,
agentInfo.ClusterName,
checkinInterval,
coreFactory.Core().V1().Node(),
fleetFactory.Fleet().V1alpha1().Cluster(),
)

<-cmd.Context().Done()
}()
},
OnStoppedLeading: func() {
setupLog.Info("stopped leading")
os.Exit(1)
},
OnNewLeader: func(identity string) {
if identity == identifier {
setupLog.Info("renewed leader", "new identity", identity, "own identity", identifier)
} else {
setupLog.Info("new leader", "new identity", identity, "own identity", identifier)
}
},
},
}

setupLog.Info("Starting cluster status ticker", "checkin interval", checkinInterval.String(), "cluster namespace", agentInfo.ClusterNamespace, "cluster name", agentInfo.ClusterName)

clusterstatus.Ticker(ctx,
cs.Namespace,
agentInfo.ClusterNamespace,
agentInfo.ClusterName,
checkinInterval,
coreFactory.Core().V1().Node(),
fleetFactory.Fleet().V1alpha1().Cluster(),
)

<-cmd.Context().Done()
leaderelection.RunOrDie(ctx, leaderElectionConfig)

return nil
}
Expand Down
9 changes: 7 additions & 2 deletions internal/cmd/agent/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *Register) Run(cmd *cobra.Command, args []string) error {

lock := resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Name: leaseLockNameRegister,
Namespace: r.Namespace,
},
Client: localClient.CoordinationV1(),
Expand Down Expand Up @@ -124,10 +124,15 @@ func (r *Register) Run(cmd *cobra.Command, args []string) error {
os.Exit(0)
},
OnStoppedLeading: func() {
setupLog.Info("stopped leading")
os.Exit(1)
},
OnNewLeader: func(identity string) {
setupLog.Info("new leader", "identity", identity)
if identity == identifier {
setupLog.Info("renewed leader", "identity", identity)
} else {
setupLog.Info("new leader", "identity", identity)
}
},
},
}
Expand Down
13 changes: 9 additions & 4 deletions internal/cmd/agent/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type AgentReconcilerWorkers struct {
}

const (
leaseLockName = "fleet-agent"
leaseLockName = "fleet-agent"
leaseLockNameClusterStatus = "fleet-agent-clusterstatus"
leaseLockNameRegister = "fleet-agent-register"
)

var (
Expand Down Expand Up @@ -147,11 +149,14 @@ func (a *FleetAgent) Run(cmd *cobra.Command, args []string) error {
},
OnStoppedLeading: func() {
setupLog.Info("stopped leading")
setupLog.Info("exit here?")
// os.Exit(0)
os.Exit(1)
},
OnNewLeader: func(identity string) {
setupLog.Info("new leader", "identity", identity)
if identity == identifier {
setupLog.Info("renewed leader", "identity", identity)
} else {
setupLog.Info("new leader", "identity", identity)
}
},
},
}
Expand Down

0 comments on commit f9aea73

Please sign in to comment.