Skip to content

Commit

Permalink
perf: Cache the pod request calculation in memory (#1825)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Nov 18, 2024
1 parent 65aa1bf commit 6a036cb
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewExistingNode(n *state.StateNode, topology *Topology, daemonResources v1.
return node
}

func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod) error {
func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v1.Pod, podRequests v1.ResourceList) error {
// Check Taints
if err := scheduling.Taints(n.Taints()).Tolerates(pod); err != nil {
return err
Expand All @@ -84,7 +84,7 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))
requests := resources.Merge(n.requests, podRequests)

if !resources.Fits(requests, n.cachedAvailable) {
return fmt.Errorf("exceeds node resources")
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem
}
}

func (n *NodeClaim) Add(pod *v1.Pod) error {
func (n *NodeClaim) Add(pod *v1.Pod, podRequests v1.ResourceList) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
return err
Expand Down Expand Up @@ -101,13 +101,13 @@ func (n *NodeClaim) Add(pod *v1.Pod) error {
nodeClaimRequirements.Add(topologyRequirements.Values()...)

// Check instance type combinations
requests := resources.Merge(n.Spec.Resources.Requests, resources.RequestsForPods(pod))
requests := resources.Merge(n.Spec.Resources.Requests, podRequests)

filtered := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, requests)

if len(filtered.remaining) == 0 {
// log the total resources being requested (daemonset + the pod)
cumulativeResources := resources.Merge(n.daemonResources, resources.RequestsForPods(pod))
cumulativeResources := resources.Merge(n.daemonResources, podRequests)
return fmt.Errorf("no instance type satisfied resources %s and requirements %s (%s)", resources.String(cumulativeResources), nodeClaimRequirements, filtered.FailureReason())
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/provisioning/scheduling/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Queue struct {
}

// NewQueue constructs a new queue given the input pods, sorting them to optimize for bin-packing into nodes.
func NewQueue(pods ...*v1.Pod) *Queue {
sort.Slice(pods, byCPUAndMemoryDescending(pods))
func NewQueue(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) *Queue {
sort.Slice(pods, byCPUAndMemoryDescending(pods, podRequests))
return &Queue{
pods: pods,
lastLen: map[types.UID]int{},
Expand Down Expand Up @@ -73,13 +73,13 @@ func (q *Queue) List() []*v1.Pod {
return q.pods
}

func byCPUAndMemoryDescending(pods []*v1.Pod) func(i int, j int) bool {
func byCPUAndMemoryDescending(pods []*v1.Pod, podRequests map[types.UID]v1.ResourceList) func(i int, j int) bool {
return func(i, j int) bool {
lhsPod := pods[i]
rhsPod := pods[j]

lhs := resources.RequestsForPods(lhsPod)
rhs := resources.RequestsForPods(rhsPod)
lhs := podRequests[lhsPod.UID]
rhs := podRequests[rhsPod.UID]

cpuCmp := resources.Cmp(lhs[v1.ResourceCPU], rhs[v1.ResourceCPU])
if cpuCmp < 0 {
Expand Down
13 changes: 9 additions & 4 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1
topology: topology,
cluster: cluster,
daemonOverhead: getDaemonOverhead(templates, daemonSetPods),
cachedPodRequests: map[types.UID]corev1.ResourceList{}, // cache pod requests to avoid having to continually recompute this total
recorder: recorder,
preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule},
remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) {
Expand All @@ -94,6 +95,7 @@ type Scheduler struct {
nodeClaimTemplates []*NodeClaimTemplate
remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool
daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList
cachedPodRequests map[types.UID]corev1.ResourceList // (Pod Namespace/Name) -> calculated resource requests for the pod
preferences *Preferences
topology *Topology
cluster *state.Cluster
Expand Down Expand Up @@ -213,7 +215,10 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
// Reset the metric for the controller, so we don't keep old ids around
UnschedulablePodsCount.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)})
QueueDepth.DeletePartialMatch(map[string]string{ControllerLabel: injection.GetControllerName(ctx)})
q := NewQueue(pods...)
for _, p := range pods {
s.cachedPodRequests[p.UID] = resources.RequestsForPods(p)
}
q := NewQueue(pods, s.cachedPodRequests)

startTime := s.clock.Now()
lastLogTime := s.clock.Now()
Expand Down Expand Up @@ -262,7 +267,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
if err := node.Add(ctx, s.kubeClient, pod); err == nil {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID]); err == nil {
return nil
}
}
Expand All @@ -272,7 +277,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {

// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
if err := nodeClaim.Add(pod); err == nil {
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil {
return nil
}
}
Expand All @@ -293,7 +298,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
}
}
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes)
if err := nodeClaim.Add(pod); err != nil {
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err != nil {
nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim
errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w",
nodeClaimTemplate.NodePoolName,
Expand Down

0 comments on commit 6a036cb

Please sign in to comment.