Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.

Commit 810dc80

Browse files
authored
Add basic support for resource quota (#143)
* add basic support for resource quota * fix declaration error * add resourcequotas to RBAC * add resourcequotas to helm cluster roles * fix kubebuilder directive * cleanup * fixed deepcopy
1 parent 5d6d5fa commit 810dc80

File tree

9 files changed

+360
-9
lines changed

9 files changed

+360
-9
lines changed

api/v1beta1/appwrapper_types.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ type CustomPodResource struct {
222222
Requests v1.ResourceList `json:"requests"`
223223

224224
// Limits per replica
225-
NotImplemented_Limits v1.ResourceList `json:"limits,omitempty"`
225+
Limits v1.ResourceList `json:"limits,omitempty"`
226226
}
227227

228228
// State transition

api/v1beta1/zz_generated.deepcopy.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/rbac/role.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ rules:
3434
- patch
3535
- update
3636
- watch
37+
- apiGroups:
38+
- ""
39+
resources:
40+
- resourcequotas
41+
verbs:
42+
- get
43+
- list
44+
- watch
3745
- apiGroups:
3846
- apps
3947
resources:

deployment/mcad-controller/templates/rbac/clusterrole.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,11 @@ rules:
270270
- subjectaccessreviews
271271
verbs:
272272
- create
273+
- apiGroups:
274+
- ""
275+
resources:
276+
- resourcequotas
277+
verbs:
278+
- get
279+
- list
280+
- watch

internal/controller/appwrapper_controller.go

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers,verbs=get;list;watch;create;update;patch;delete
3535
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch
3636
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/finalizers,verbs=update
37+
//+kubebuilder:rbac:groups="",resources=resourcequotas,verbs=get;list;watch
3738

3839
// AppWrapperReconciler is the super type of Dispatcher and Runner reconcilers
3940
type AppWrapperReconciler struct {

internal/controller/dispatch_logic.go

+41-5
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (r *Dispatcher) listAppWrappers(ctx context.Context) (map[int]Weights, []*m
157157
}
158158

159159
// Find next AppWrapper to dispatch in queue order
160-
func (r *Dispatcher) selectForDispatch(ctx context.Context) ([]*mcadv1beta1.AppWrapper, error) {
160+
func (r *Dispatcher) selectForDispatch(ctx context.Context, quotatracker *QuotaTracker) ([]*mcadv1beta1.AppWrapper, error) {
161161
selected := []*mcadv1beta1.AppWrapper{}
162162
logThisDispatch := time.Now().After(r.NextLoggedDispatch)
163163
if logThisDispatch {
@@ -204,13 +204,38 @@ func (r *Dispatcher) selectForDispatch(ctx context.Context) ([]*mcadv1beta1.AppW
204204
// return ordered slice of AppWrappers that fit (may be empty)
205205
for _, appWrapper := range queue {
206206
request := aggregateRequests(appWrapper)
207+
// get resourceQuota in AppWrapper namespace, if any
208+
resourceQuotas := &v1.ResourceQuotaList{}
209+
namespace := appWrapper.GetNamespace()
210+
if err := r.List(ctx, resourceQuotas, client.UnsafeDisableDeepCopy,
211+
&client.ListOptions{Namespace: namespace}); err != nil {
212+
return nil, err
213+
}
214+
quotaFits := true
215+
var appWrapperAskWeights *WeightsPair
216+
insufficientResources := []v1.ResourceName{}
217+
if len(resourceQuotas.Items) > 0 {
218+
appWrapperAskWeights = getWeightsPairForAppWrapper(appWrapper)
219+
// assuming only one resourceQuota per nameSpace
220+
quotaFits, insufficientResources = quotatracker.Satisfies(appWrapperAskWeights, &resourceQuotas.Items[0])
221+
}
207222
fits, gaps := request.Fits(available[int(appWrapper.Spec.Priority)])
208223
if fits {
209-
selected = append(selected, appWrapper.DeepCopy()) // deep copy AppWrapper
210-
for priority, avail := range available {
211-
if priority <= int(appWrapper.Spec.Priority) {
212-
avail.Sub(request)
224+
// check if appwrapper passes resource quota (if any)
225+
if quotaFits {
226+
quotatracker.Allocate(namespace, appWrapperAskWeights)
227+
selected = append(selected, appWrapper.DeepCopy()) // deep copy AppWrapper
228+
for priority, avail := range available {
229+
if priority <= int(appWrapper.Spec.Priority) {
230+
avail.Sub(request)
231+
}
232+
}
233+
} else {
234+
var msgBuilder strings.Builder
235+
for _, resource := range insufficientResources {
236+
msgBuilder.WriteString(fmt.Sprintf("Insufficient %v. ", resource))
213237
}
238+
r.Decisions[appWrapper.UID] = &QueuingDecision{reason: mcadv1beta1.QueuedInsufficientQuota, message: msgBuilder.String()}
214239
}
215240
} else {
216241
var msgBuilder strings.Builder
@@ -237,6 +262,17 @@ func aggregateRequests(appWrapper *mcadv1beta1.AppWrapper) Weights {
237262
return request
238263
}
239264

265+
// Aggregate limits
266+
func aggregateLimits(appWrapper *mcadv1beta1.AppWrapper) Weights {
267+
limit := Weights{}
268+
for _, r := range appWrapper.Spec.Resources.GenericItems {
269+
for _, cpr := range r.CustomPodResources {
270+
limit.AddProd(cpr.Replicas, NewWeights(cpr.Limits))
271+
}
272+
}
273+
return limit
274+
}
275+
240276
// Propagate reservations at all priority levels to all levels below
241277
func assertPriorities(w map[int]Weights) {
242278
keys := make([]int, len(w))

internal/controller/dispatcher.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"fmt"
2323
"time"
2424

25+
v1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/api/meta"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/types"
2829
ctrl "sigs.k8s.io/controller-runtime"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
2931
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3032
"sigs.k8s.io/controller-runtime/pkg/event"
3133
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -225,8 +227,14 @@ func (r *Dispatcher) triggerDispatch() {
225227

226228
// Attempt to select and dispatch appWrappers until either capacity is exhausted or no candidates remain
227229
func (r *Dispatcher) dispatch(ctx context.Context) (ctrl.Result, error) {
230+
// track quota allocation to AppWrappers during a dispatching cycle;
231+
// used in only one cycle, does not carry from cycle to cycle
232+
quotaTracker := NewQuotaTracker()
233+
if weightsPairMap, err := r.getUnadmittedPodsWeights(ctx); err == nil {
234+
quotaTracker.Init(weightsPairMap)
235+
}
228236
// find dispatch candidates according to priorities, precedence, and available resources
229-
selectedAppWrappers, err := r.selectForDispatch(ctx)
237+
selectedAppWrappers, err := r.selectForDispatch(ctx, quotaTracker)
230238
if err != nil {
231239
return ctrl.Result{}, err
232240
}
@@ -257,3 +265,40 @@ func (r *Dispatcher) dispatch(ctx context.Context) (ctrl.Result, error) {
257265

258266
return ctrl.Result{RequeueAfter: dispatchDelay}, nil
259267
}
268+
269+
// Calculate resource demands of pods for appWrappers that have been dispatched but haven't
270+
// passed through ResourceQuota admission controller yet (approximated by resources not created yet)
271+
func (r *Dispatcher) getUnadmittedPodsWeights(ctx context.Context) (map[string]*WeightsPair, error) {
272+
appWrappers := &mcadv1beta1.AppWrapperList{}
273+
if err := r.List(ctx, appWrappers, client.UnsafeDisableDeepCopy); err != nil {
274+
return nil, err
275+
}
276+
weightsPairMap := make(map[string]*WeightsPair)
277+
for _, appWrapper := range appWrappers.Items {
278+
_, step := r.getCachedAW(&appWrapper)
279+
if step != mcadv1beta1.Idle {
280+
namespace := appWrapper.GetNamespace()
281+
weightsPair := weightsPairMap[namespace]
282+
if weightsPair == nil {
283+
weightsPair = NewWeightsPair(Weights{}, Weights{})
284+
}
285+
weightsPair.Add(getWeightsPairForAppWrapper(&appWrapper))
286+
287+
// subtract weights for admitted (created) pods for this appWrapper
288+
// (already accounted for in the used status of the resourceQuota)
289+
pods := &v1.PodList{}
290+
if err := r.List(ctx, pods, client.UnsafeDisableDeepCopy,
291+
client.MatchingLabels{namespaceLabel: namespace, nameLabel: appWrapper.Name}); err == nil {
292+
createdPodsWeightsPair := &WeightsPair{requests: Weights{}, limits: Weights{}}
293+
for _, pod := range pods.Items {
294+
createdPodsWeightsPair.Add(NewWeightsPairForPod(&pod))
295+
}
296+
weightsPair.Sub(createdPodsWeightsPair)
297+
}
298+
nonNegativeWeightsPair := NewWeightsPair(Weights{}, Weights{})
299+
nonNegativeWeightsPair.Max(weightsPair)
300+
weightsPairMap[namespace] = nonNegativeWeightsPair
301+
}
302+
}
303+
return weightsPairMap, nil
304+
}

internal/controller/quota_tracker.go

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
Copyright 2023 IBM Corporation.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"strings"
21+
22+
mcadv1beta1 "github.com/project-codeflare/mcad/api/v1beta1"
23+
v1 "k8s.io/api/core/v1"
24+
)
25+
26+
// Should be defined in api/core/v1/types.go
27+
const DefaultResourceLimitsPrefix = "limits."
28+
29+
// A tracker of allocated quota, mapped by namespace
30+
type QuotaTracker struct {
31+
// state of quotas, used, and allocated amounts
32+
state map[string]*QuotaState
33+
34+
// used amounts by dispatched AppWrappers partially unaccounted by the ResourceQuota,
35+
// as some pods may have not passed the ResourceQuota admission controller
36+
unAdmittedWeightsMap map[string]*WeightsPair
37+
}
38+
39+
// Create a new QuotaTracker
40+
func NewQuotaTracker() *QuotaTracker {
41+
return &QuotaTracker{
42+
state: map[string]*QuotaState{},
43+
unAdmittedWeightsMap: map[string]*WeightsPair{},
44+
}
45+
}
46+
47+
// State includes total quota, used quota, and currently allocated quota
48+
type QuotaState struct {
49+
// quota enforced in the ResourceQuota object
50+
quota *WeightsPair
51+
// used amount in the status of the ResourceQuota object
52+
used *WeightsPair
53+
// allocated amount by dispatched AppWrappers in the current dispatching cycle
54+
allocated *WeightsPair
55+
}
56+
57+
// Create a QuotaState from a ResourceQuota object
58+
func NewQuotaStateFromResourceQuota(resourceQuota *v1.ResourceQuota) *QuotaState {
59+
quotaWeights, usedWeights := getQuotaAndUsedWeightsPairsForResourceQuota(resourceQuota)
60+
return &QuotaState{
61+
quota: quotaWeights,
62+
used: usedWeights,
63+
allocated: NewWeightsPair(Weights{}, Weights{}),
64+
}
65+
}
66+
67+
// Account for all in-flight AppWrappers with their resource demand not yet reflected in
68+
// the Used status of any ResourceQuota object in their corresponding namespace
69+
func (tracker *QuotaTracker) Init(weightsPairMap map[string]*WeightsPair) {
70+
tracker.unAdmittedWeightsMap = weightsPairMap
71+
}
72+
73+
// Check if the resource demand of an AppWrapper satisfies a ResourceQuota,
74+
// without changing the current quota allocation, returning resource names with insufficient quota
75+
func (tracker *QuotaTracker) Satisfies(appWrapperAskWeights *WeightsPair, resourceQuota *v1.ResourceQuota) (bool, []v1.ResourceName) {
76+
namespace := resourceQuota.GetNamespace()
77+
var quotaState *QuotaState
78+
var exists bool
79+
if quotaState, exists = tracker.state[namespace]; !exists {
80+
quotaState = NewQuotaStateFromResourceQuota(resourceQuota)
81+
tracker.state[namespace] = quotaState
82+
}
83+
// check if both appwrapper requests and limits fit available resource quota
84+
quotaWeights := quotaState.quota.Clone()
85+
quotaWeights.Sub(quotaState.used)
86+
quotaWeights.Sub(quotaState.allocated)
87+
var unAdmittedWeights *WeightsPair
88+
if unAdmittedWeights, exists = tracker.unAdmittedWeightsMap[namespace]; exists {
89+
quotaWeights.Sub(unAdmittedWeights)
90+
}
91+
quotaFits, insufficientResources := appWrapperAskWeights.Fits(quotaWeights)
92+
93+
// mcadLog.Info("QuotaTracker.Satisfies():", "namespace", namespace,
94+
// "QuotaWeights", quotaState.quota, "UsedWeights", quotaState.used,
95+
// "AllocatedWeights", quotaState.allocated, "unAdmittedWeights", unAdmittedWeights,
96+
// "AvailableWeights", quotaWeights, "appWrapperAskWeights", appWrapperAskWeights,
97+
// "quotaFits", quotaFits)
98+
return quotaFits, insufficientResources
99+
}
100+
101+
// Update the QuotaState by the allocated weights of an AppWrapper in a namespace,
102+
// fails if QuotaState does not exist in the QuotaTracker
103+
func (tracker *QuotaTracker) Allocate(namespace string, appWrapperAskWeights *WeightsPair) bool {
104+
if state, exists := tracker.state[namespace]; exists && appWrapperAskWeights != nil {
105+
state.allocated.Add(appWrapperAskWeights)
106+
return true
107+
}
108+
return false
109+
}
110+
111+
// Get requests and limits from AppWrapper specs
112+
func getWeightsPairForAppWrapper(appWrapper *mcadv1beta1.AppWrapper) *WeightsPair {
113+
requests := aggregateRequests(appWrapper)
114+
limits := aggregateLimits(appWrapper)
115+
return NewWeightsPair(requests, limits)
116+
}
117+
118+
// Get requests and limits for both quota and used from ResourceQuota object
119+
func getQuotaAndUsedWeightsPairsForResourceQuota(resourceQuota *v1.ResourceQuota) (quotaWeights *WeightsPair,
120+
usedWeights *WeightsPair) {
121+
quotaWeights = getWeightsPairForResourceList(&resourceQuota.Status.Hard)
122+
usedWeights = getWeightsPairForResourceList(&resourceQuota.Status.Used)
123+
return quotaWeights, usedWeights
124+
}
125+
126+
// Create a pair of Weights for requests and limits
127+
// given in a ResourceList of a ResourceQuota
128+
func getWeightsPairForResourceList(r *v1.ResourceList) *WeightsPair {
129+
requests := Weights{}
130+
limits := Weights{}
131+
for k, v := range *r {
132+
if strings.HasPrefix(k.String(), DefaultResourceLimitsPrefix) {
133+
trimmedName := strings.Replace(k.String(), DefaultResourceLimitsPrefix, "", 1)
134+
if _, exists := limits[v1.ResourceName(trimmedName)]; !exists {
135+
limits[v1.ResourceName(trimmedName)] = v.AsDec()
136+
}
137+
continue
138+
}
139+
if strings.HasPrefix(k.String(), v1.DefaultResourceRequestsPrefix) {
140+
trimmedName := strings.Replace(k.String(), v1.DefaultResourceRequestsPrefix, "", 1)
141+
k = v1.ResourceName(trimmedName)
142+
}
143+
// in case of two keys: requests.xxx and xxx, take the minimum quota of the two
144+
if value, exists := requests[k]; !exists || value.Cmp(v.AsDec()) > 0 {
145+
requests[k] = v.AsDec()
146+
}
147+
}
148+
return NewWeightsPair(requests, limits)
149+
}

0 commit comments

Comments
 (0)