Skip to content

Commit ed98913

Browse files
committed
Add reconciler sharding capability based on label
With this enhancement, the controller can be configured with `--watch-label-selector`, after which only objects with this label will be reconciled by the controller. This allows for horizontal scaling of the source-controller, where each controller can be deployed multiple times with a unique label selector which is used as the sharding key. Note that this also requires configuration of the `--storage-adv-addr` to a unique address (in combination with a proper Service definition). This to ensure the Artifacts handled by the sharding controller point to a unique endpoint. In addition, Source object kinds which have a dependency on another kind (i.e. a HelmChart on a HelmRepository) need to have the same labels applied to work as expected. Signed-off-by: Hidde Beydals <[email protected]>
1 parent 51dea22 commit ed98913

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ require (
2929
github.com/fluxcd/pkg/lockedfile v0.1.0
3030
github.com/fluxcd/pkg/masktoken v0.2.0
3131
github.com/fluxcd/pkg/oci v0.21.1
32-
github.com/fluxcd/pkg/runtime v0.33.0
32+
github.com/fluxcd/pkg/runtime v0.34.0
3333
github.com/fluxcd/pkg/sourceignore v0.3.3
3434
github.com/fluxcd/pkg/ssh v0.7.3
3535
github.com/fluxcd/pkg/testserver v0.4.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,8 @@ github.com/fluxcd/pkg/masktoken v0.2.0 h1:HoSPTk4l1fz5Fevs2vVRvZGru33blfMwWSZKsH
550550
github.com/fluxcd/pkg/masktoken v0.2.0/go.mod h1:EA7GleAHL33kN6kTW06m5R3/Q26IyuGO7Ef/0CtpDI0=
551551
github.com/fluxcd/pkg/oci v0.21.1 h1:9kn19wkabE2xB77NRlOtMJlSYhZmUjdloZCzlHdAS6s=
552552
github.com/fluxcd/pkg/oci v0.21.1/go.mod h1:9E2DBlQII7YmeWt2ieTh38wwkiBqx3yg5NEJ51uefaA=
553-
github.com/fluxcd/pkg/runtime v0.33.0 h1:y6mFOj22mU/BXAxSTucTlT7vrWUjd0+iccK0pRN5CF0=
554-
github.com/fluxcd/pkg/runtime v0.33.0/go.mod h1:oDTerqMMtOQVNZeidwAPG7g/ai2xuidUduJzQh1IBVI=
553+
github.com/fluxcd/pkg/runtime v0.34.0 h1:vnwsCZcJtD9iE7K8d4rpE6YSYFWDrFOdA85Poagyp8s=
554+
github.com/fluxcd/pkg/runtime v0.34.0/go.mod h1:oDTerqMMtOQVNZeidwAPG7g/ai2xuidUduJzQh1IBVI=
555555
github.com/fluxcd/pkg/sourceignore v0.3.3 h1:Ue29JAuPECEYdvIqdpXpQaDxpeySn7amarLArp7XoIs=
556556
github.com/fluxcd/pkg/sourceignore v0.3.3/go.mod h1:yuJzKggph0Bdbk9LgXjJQhvJZSTJV/1vS7mJuB7mPa0=
557557
github.com/fluxcd/pkg/ssh v0.7.3 h1:Dhs+nXdp806lBriUJtPyRi0SVIVWbJafJGD/qQ71GiY=

main.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ import (
2828
flag "github.com/spf13/pflag"
2929
"helm.sh/helm/v3/pkg/getter"
3030
corev1 "k8s.io/api/core/v1"
31+
"k8s.io/apimachinery/pkg/labels"
3132
"k8s.io/apimachinery/pkg/runtime"
3233
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3334
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3435
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
3536
ctrl "sigs.k8s.io/controller-runtime"
37+
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
3638
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
3739

3840
"github.com/fluxcd/pkg/git"
@@ -45,16 +47,16 @@ import (
4547
"github.com/fluxcd/pkg/runtime/pprof"
4648
"github.com/fluxcd/pkg/runtime/probes"
4749

48-
"github.com/fluxcd/source-controller/internal/digest"
49-
"github.com/fluxcd/source-controller/internal/features"
50-
"github.com/fluxcd/source-controller/internal/helm/registry"
50+
"github.com/fluxcd/source-controller/api/v1"
51+
"github.com/fluxcd/source-controller/api/v1beta2"
52+
// +kubebuilder:scaffold:imports
5153

52-
v1 "github.com/fluxcd/source-controller/api/v1"
53-
v1beta2 "github.com/fluxcd/source-controller/api/v1beta2"
5454
"github.com/fluxcd/source-controller/controllers"
5555
"github.com/fluxcd/source-controller/internal/cache"
56+
"github.com/fluxcd/source-controller/internal/digest"
57+
"github.com/fluxcd/source-controller/internal/features"
5658
"github.com/fluxcd/source-controller/internal/helm"
57-
// +kubebuilder:scaffold:imports
59+
"github.com/fluxcd/source-controller/internal/helm/registry"
5860
)
5961

6062
const controllerName = "source-controller"
@@ -92,7 +94,6 @@ func main() {
9294
storageAdvAddr string
9395
concurrent int
9496
requeueDependency time.Duration
95-
watchAllNamespaces bool
9697
helmIndexLimit int64
9798
helmChartLimit int64
9899
helmChartFileLimit int64
@@ -101,6 +102,7 @@ func main() {
101102
leaderElectionOptions leaderelection.Options
102103
rateLimiterOptions helper.RateLimiterOptions
103104
featureGates feathelper.FeatureGates
105+
watchOptions helper.WatchOptions
104106
helmCacheMaxSize int
105107
helmCacheTTL string
106108
helmCachePurgeInterval string
@@ -121,8 +123,6 @@ func main() {
121123
flag.StringVar(&storageAdvAddr, "storage-adv-addr", envOrDefault("STORAGE_ADV_ADDR", ""),
122124
"The advertised address of the static file server.")
123125
flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.")
124-
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
125-
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
126126
flag.Int64Var(&helmIndexLimit, "helm-index-max-size", helm.MaxIndexSize,
127127
"The max allowed size in bytes of a Helm repository index file.")
128128
flag.Int64Var(&helmChartLimit, "helm-chart-max-size", helm.MaxChartSize,
@@ -153,6 +153,7 @@ func main() {
153153
leaderElectionOptions.BindFlags(flag.CommandLine)
154154
rateLimiterOptions.BindFlags(flag.CommandLine)
155155
featureGates.BindFlags(flag.CommandLine)
156+
watchOptions.BindFlags(flag.CommandLine)
156157

157158
flag.Parse()
158159

@@ -180,10 +181,28 @@ func main() {
180181
helm.MaxChartFileSize = helmChartFileLimit
181182

182183
watchNamespace := ""
183-
if !watchAllNamespaces {
184+
if !watchOptions.AllNamespaces {
184185
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
185186
}
186187

188+
var newSelectingCache ctrlcache.NewCacheFunc
189+
watchSelector, err := helper.GetWatchSelector(watchOptions)
190+
if err != nil {
191+
setupLog.Error(err, "unable to configure watch label selector")
192+
os.Exit(1)
193+
}
194+
if watchSelector != labels.Everything() {
195+
newSelectingCache = ctrlcache.BuilderWithOptions(ctrlcache.Options{
196+
SelectorsByObject: ctrlcache.SelectorsByObject{
197+
&v1.GitRepository{}: {Label: watchSelector},
198+
&v1beta2.HelmRepository{}: {Label: watchSelector},
199+
&v1beta2.HelmChart{}: {Label: watchSelector},
200+
&v1beta2.Bucket{}: {Label: watchSelector},
201+
&v1beta2.OCIRepository{}: {Label: watchSelector},
202+
},
203+
})
204+
}
205+
187206
var disableCacheFor []ctrlclient.Object
188207
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
189208
if err != nil {
@@ -209,6 +228,7 @@ func main() {
209228
Namespace: watchNamespace,
210229
Logger: ctrl.Log,
211230
ClientDisableCacheFor: disableCacheFor,
231+
NewCache: newSelectingCache,
212232
})
213233
if err != nil {
214234
setupLog.Error(err, "unable to start manager")

0 commit comments

Comments
 (0)