Skip to content

Commit 0afbc71

Browse files
author
k8s-merge-robot
committed
Merge pull request kubernetes#21030 from janetkuo/deployment-label-adopted
Auto commit by PR queue bot
2 parents 7662a5e + 4699a6d commit 0afbc71

File tree

5 files changed

+365
-52
lines changed

5 files changed

+365
-52
lines changed

pkg/controller/deployment/deployment_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,10 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
665665
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
666666

667667
existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client,
668+
func(namespace string, options api.ListOptions) (*api.PodList, error) {
669+
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
670+
return &podList, err
671+
},
668672
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
669673
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
670674
})

pkg/util/deployment/deployment.go

Lines changed: 177 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ import (
2525
"k8s.io/kubernetes/pkg/api/unversioned"
2626
"k8s.io/kubernetes/pkg/apis/extensions"
2727
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
28+
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
29+
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
2830
"k8s.io/kubernetes/pkg/labels"
2931
"k8s.io/kubernetes/pkg/util/integer"
3032
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
3133
labelsutil "k8s.io/kubernetes/pkg/util/labels"
3234
podutil "k8s.io/kubernetes/pkg/util/pod"
35+
"k8s.io/kubernetes/pkg/util/wait"
3336
)
3437

3538
const (
@@ -55,28 +58,21 @@ func GetOldReplicaSets(deployment extensions.Deployment, c clientset.Interface)
5558
})
5659
}
5760

61+
// TODO: switch this to full namespacers
62+
type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error)
63+
type podListFunc func(string, api.ListOptions) (*api.PodList, error)
64+
5865
// GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions.
5966
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
60-
func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
61-
namespace := deployment.ObjectMeta.Namespace
62-
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
63-
if err != nil {
64-
return nil, nil, fmt.Errorf("invalid label selector: %v", err)
65-
}
66-
67-
// 1. Find all pods whose labels match deployment.Spec.Selector
68-
options := api.ListOptions{LabelSelector: selector}
69-
podList, err := getPodList(namespace, options)
70-
if err != nil {
71-
return nil, nil, fmt.Errorf("error listing pods: %v", err)
72-
}
73-
// 2. Find the corresponding replica sets for pods in podList.
67+
func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
68+
// Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList.
69+
// All pods and replica sets are labeled with pod-template-hash to prevent overlapping
7470
// TODO: Right now we list all replica sets and then filter. We should add an API for this.
7571
oldRSs := map[string]extensions.ReplicaSet{}
7672
allOldRSs := map[string]extensions.ReplicaSet{}
77-
rsList, err := getRSList(namespace, options)
73+
rsList, podList, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList)
7874
if err != nil {
79-
return nil, nil, fmt.Errorf("error listing replica sets: %v", err)
75+
return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
8076
}
8177
newRSTemplate := GetNewReplicaSetTemplate(deployment)
8278
for _, pod := range podList.Items {
@@ -113,6 +109,9 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In
113109
// Returns nil if the new replica set doesn't exist yet.
114110
func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
115111
return GetNewReplicaSetFromList(deployment, c,
112+
func(namespace string, options api.ListOptions) (*api.PodList, error) {
113+
return c.Core().Pods(namespace).List(options)
114+
},
116115
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
117116
rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
118117
return rsList.Items, err
@@ -121,14 +120,8 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (
121120

122121
// GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function.
123122
// Returns nil if the new replica set doesn't exist yet.
124-
func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) (*extensions.ReplicaSet, error) {
125-
namespace := deployment.ObjectMeta.Namespace
126-
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
127-
if err != nil {
128-
return nil, fmt.Errorf("invalid label selector: %v", err)
129-
}
130-
131-
rsList, err := getRSList(namespace, api.ListOptions{LabelSelector: selector})
123+
func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) {
124+
rsList, _, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList)
132125
if err != nil {
133126
return nil, fmt.Errorf("error listing ReplicaSets: %v", err)
134127
}
@@ -144,6 +137,166 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte
144137
return nil, nil
145138
}
146139

140+
// rsAndPodsWithHashKeySynced returns the RSs and pods the given deployment targets, with pod-template-hash information synced.
141+
func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) {
142+
namespace := deployment.Namespace
143+
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
144+
if err != nil {
145+
return nil, nil, err
146+
}
147+
options := api.ListOptions{LabelSelector: selector}
148+
rsList, err := getRSList(namespace, options)
149+
if err != nil {
150+
return nil, nil, err
151+
}
152+
syncedRSList := []extensions.ReplicaSet{}
153+
for _, rs := range rsList {
154+
// Add pod-template-hash information if it's not in the RS.
155+
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
156+
// that aren't constrained by the pod-template-hash.
157+
syncedRS, err := addHashKeyToRSAndPods(deployment, c, rs, getPodList)
158+
if err != nil {
159+
return nil, nil, err
160+
}
161+
syncedRSList = append(syncedRSList, *syncedRS)
162+
}
163+
syncedPodList, err := getPodList(namespace, options)
164+
if err != nil {
165+
return nil, nil, err
166+
}
167+
return syncedRSList, syncedPodList, nil
168+
}
169+
170+
// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
171+
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
172+
// 2. Add hash label to all pods this rs owns
173+
// 3. Add hash label to the rs's label and selector
174+
func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (updatedRS *extensions.ReplicaSet, err error) {
175+
// If the rs already has the new hash label in its selector, it's done syncing
176+
namespace := deployment.Namespace
177+
hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(api.PodTemplateSpec{
178+
ObjectMeta: rs.Spec.Template.ObjectMeta,
179+
Spec: rs.Spec.Template.Spec,
180+
}))
181+
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
182+
return &rs, nil
183+
}
184+
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
185+
if len(rs.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
186+
updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
187+
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
188+
})
189+
if err != nil {
190+
return nil, err
191+
}
192+
}
193+
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
194+
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
195+
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, rs.Name); err != nil {
196+
return nil, err
197+
}
198+
}
199+
200+
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
201+
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
202+
if err != nil {
203+
return nil, err
204+
}
205+
options := api.ListOptions{LabelSelector: selector}
206+
podList, err := getPodList(namespace, options)
207+
if err != nil {
208+
return nil, err
209+
}
210+
if err = labelPodsWithHash(podList, c, namespace, hash); err != nil {
211+
return nil, err
212+
}
213+
214+
// 3. Update rs label and selector to include the new hash label
215+
// Copy the old selector, so that we can scrub out any orphaned pods
216+
if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
217+
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
218+
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
219+
}); err != nil {
220+
return nil, err
221+
}
222+
223+
// TODO: look for orphaned pods and label them in the background somewhere else periodically
224+
225+
return updatedRS, nil
226+
}
227+
228+
func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error {
229+
return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
230+
rs, err := c.Extensions().ReplicaSets(namespace).Get(name)
231+
if err != nil {
232+
return false, err
233+
}
234+
return rs.Status.ObservedGeneration >= desiredGeneration, nil
235+
})
236+
}
237+
238+
// labelPodsWithHash labels all pods in the given podList with the new hash label.
239+
func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error {
240+
for _, pod := range podList.Items {
241+
// Only label the pod that doesn't already have the new hash
242+
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
243+
if _, err := updatePodWithRetries(c.Core().Pods(namespace), &pod, func(updated *api.Pod) {
244+
pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
245+
}); err != nil {
246+
return err
247+
}
248+
}
249+
}
250+
return nil
251+
}
252+
253+
// TODO: use client library instead when it starts to support update retries
254+
// see https://github.com/kubernetes/kubernetes/issues/21479
255+
type updateRSFunc func(rs *extensions.ReplicaSet)
256+
257+
func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) {
258+
var err error
259+
oldRs := rs
260+
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
261+
// Apply the update, then attempt to push it to the apiserver.
262+
applyUpdate(rs)
263+
if rs, err = rsClient.Update(rs); err == nil {
264+
// rs contains the latest controller post update
265+
return true, nil
266+
}
267+
// Update the controller with the latest resource version, if the update failed we
268+
// can't trust rs so use oldRs.Name.
269+
if rs, err = rsClient.Get(oldRs.Name); err != nil {
270+
// The Get failed: Value in rs cannot be trusted.
271+
rs = oldRs
272+
}
273+
// The Get passed: rs contains the latest controller, expect a poll for the update.
274+
return false, nil
275+
})
276+
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
277+
// controller contains the applied update.
278+
return rs, err
279+
}
280+
281+
type updatePodFunc func(pod *api.Pod)
282+
283+
func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
284+
var err error
285+
oldPod := pod
286+
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
287+
// Apply the update, then attempt to push it to the apiserver.
288+
applyUpdate(pod)
289+
if pod, err = podClient.Update(pod); err == nil {
290+
return true, nil
291+
}
292+
if pod, err = podClient.Get(oldPod.Name); err != nil {
293+
pod = oldPod
294+
}
295+
return false, nil
296+
})
297+
return pod, err
298+
}
299+
147300
// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
148301
func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec {
149302
// newRS will have the same template as in deployment spec, plus a unique label in some cases.

0 commit comments

Comments
 (0)