Skip to content

Commit c557027

Browse files
authored
Fix cluster event queue consumption
2 parents 5a7a3fe + dad8e2f commit c557027

File tree

6 files changed

+60
-60
lines changed

6 files changed

+60
-60
lines changed

pkg/cluster/cluster.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Config struct {
4141
}
4242

4343
type kubeResources struct {
44-
Service map[postgresRole]*v1.Service
44+
Services map[postgresRole]*v1.Service
4545
Endpoint *v1.Endpoints
4646
Secrets map[types.UID]*v1.Secret
4747
Statefulset *v1beta1.StatefulSet
@@ -96,7 +96,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
9696
pgUsers: make(map[string]spec.PgUser),
9797
systemUsers: make(map[string]spec.PgUser),
9898
podSubscribers: make(map[spec.NamespacedName]chan spec.PodEvent),
99-
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Service: make(map[postgresRole]*v1.Service)},
99+
kubeResources: kubeResources{Secrets: make(map[types.UID]*v1.Secret), Services: make(map[postgresRole]*v1.Service)},
100100
masterLess: false,
101101
userSyncStrategy: users.DefaultUserSyncStrategy{},
102102
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
@@ -246,11 +246,11 @@ func (c *Cluster) Create() error {
246246

247247
func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match bool, reason string) {
248248
//TODO: improve comparison
249-
if c.Service[role].Spec.Type != service.Spec.Type {
249+
if c.Services[role].Spec.Type != service.Spec.Type {
250250
return false, fmt.Sprintf("new %s service's type %q doesn't match the current one %q",
251-
role, service.Spec.Type, c.Service[role].Spec.Type)
251+
role, service.Spec.Type, c.Services[role].Spec.Type)
252252
}
253-
oldSourceRanges := c.Service[role].Spec.LoadBalancerSourceRanges
253+
oldSourceRanges := c.Services[role].Spec.LoadBalancerSourceRanges
254254
newSourceRanges := service.Spec.LoadBalancerSourceRanges
255255
/* work around Kubernetes 1.6 serializing [] as nil. See https://github.com/kubernetes/kubernetes/issues/43203 */
256256
if (len(oldSourceRanges) == 0) && (len(newSourceRanges) == 0) {
@@ -260,7 +260,7 @@ func (c *Cluster) sameServiceWith(role postgresRole, service *v1.Service) (match
260260
return false, fmt.Sprintf("new %s service's LoadBalancerSourceRange doesn't match the current one", role)
261261
}
262262

263-
oldDNSAnnotation := c.Service[role].Annotations[constants.ZalandoDNSNameAnnotation]
263+
oldDNSAnnotation := c.Services[role].Annotations[constants.ZalandoDNSNameAnnotation]
264264
newDNSAnnotation := service.Annotations[constants.ZalandoDNSNameAnnotation]
265265
if oldDNSAnnotation != newDNSAnnotation {
266266
return false, fmt.Sprintf("new %s service's %q annotation doesn't match the current one", role, constants.ZalandoDNSNameAnnotation)
@@ -445,12 +445,12 @@ func (c *Cluster) Update(newSpec *spec.Postgresql) error {
445445
}
446446
newService := c.generateService(role, &newSpec.Spec)
447447
if match, reason := c.sameServiceWith(role, newService); !match {
448-
c.logServiceChanges(role, c.Service[role], newService, true, reason)
448+
c.logServiceChanges(role, c.Services[role], newService, true, reason)
449449
if err := c.updateService(role, newService); err != nil {
450450
c.setStatus(spec.ClusterStatusUpdateFailed)
451451
return fmt.Errorf("could not update %s service: %v", role, err)
452452
}
453-
c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Service[role].ObjectMeta))
453+
c.logger.Infof("%s service %q has been updated", role, util.NameFromMeta(c.Services[role].ObjectMeta))
454454
}
455455
}
456456

pkg/cluster/resources.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ func (c *Cluster) loadResources() error {
3131
for i, svc := range services.Items {
3232
switch postgresRole(svc.Labels[c.OpConfig.PodRoleLabel]) {
3333
case replica:
34-
c.Service[replica] = &services.Items[i]
34+
c.Services[replica] = &services.Items[i]
3535
default:
36-
c.Service[master] = &services.Items[i]
36+
c.Services[master] = &services.Items[i]
3737
}
3838
}
3939

@@ -91,7 +91,7 @@ func (c *Cluster) listResources() error {
9191
c.logger.Infof("found endpoint: %q (uid: %q)", util.NameFromMeta(c.Endpoint.ObjectMeta), c.Endpoint.UID)
9292
}
9393

94-
for role, service := range c.Service {
94+
for role, service := range c.Services {
9595
c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
9696
}
9797

@@ -231,7 +231,7 @@ func (c *Cluster) deleteStatefulSet() error {
231231
}
232232

233233
func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
234-
if c.Service[role] != nil {
234+
if c.Services[role] != nil {
235235
return nil, fmt.Errorf("service already exists in the cluster")
236236
}
237237
serviceSpec := c.generateService(role, &c.Spec)
@@ -241,18 +241,18 @@ func (c *Cluster) createService(role postgresRole) (*v1.Service, error) {
241241
return nil, err
242242
}
243243

244-
c.Service[role] = service
244+
c.Services[role] = service
245245
return service, nil
246246
}
247247

248248
func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error {
249-
if c.Service[role] == nil {
249+
if c.Services[role] == nil {
250250
return fmt.Errorf("there is no service in the cluster")
251251
}
252-
serviceName := util.NameFromMeta(c.Service[role].ObjectMeta)
252+
serviceName := util.NameFromMeta(c.Services[role].ObjectMeta)
253253
endpointName := util.NameFromMeta(c.Endpoint.ObjectMeta)
254254
// TODO: check if it possible to change the service type with a patch in future versions of Kubernetes
255-
if newService.Spec.Type != c.Service[role].Spec.Type {
255+
if newService.Spec.Type != c.Services[role].Spec.Type {
256256
// service type has changed, need to replace the service completely.
257257
// we cannot use just pach the current service, since it may contain attributes incompatible with the new type.
258258
var (
@@ -263,12 +263,12 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
263263
if role == master {
264264
// for the master service we need to re-create the endpoint as well. Get the up-to-date version of
265265
// the addresses stored in it before the service is deleted (deletion of the service removes the endpooint)
266-
currentEndpoint, err = c.KubeClient.Endpoints(c.Service[role].Namespace).Get(c.Service[role].Name, metav1.GetOptions{})
266+
currentEndpoint, err = c.KubeClient.Endpoints(c.Services[role].Namespace).Get(c.Services[role].Name, metav1.GetOptions{})
267267
if err != nil {
268268
return fmt.Errorf("could not get current cluster endpoints: %v", err)
269269
}
270270
}
271-
err = c.KubeClient.Services(c.Service[role].Namespace).Delete(c.Service[role].Name, c.deleteOptions)
271+
err = c.KubeClient.Services(c.Services[role].Namespace).Delete(c.Services[role].Name, c.deleteOptions)
272272
if err != nil {
273273
return fmt.Errorf("could not delete service %q: %v", serviceName, err)
274274
}
@@ -277,11 +277,11 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
277277
if err != nil {
278278
return fmt.Errorf("could not create service %q: %v", serviceName, err)
279279
}
280-
c.Service[role] = svc
280+
c.Services[role] = svc
281281
if role == master {
282282
// create the new endpoint using the addresses obtained from the previous one
283283
endpointSpec := c.generateMasterEndpoints(currentEndpoint.Subsets)
284-
ep, err := c.KubeClient.Endpoints(c.Service[role].Namespace).Create(endpointSpec)
284+
ep, err := c.KubeClient.Endpoints(c.Services[role].Namespace).Create(endpointSpec)
285285
if err != nil {
286286
return fmt.Errorf("could not create endpoint %q: %v", endpointName, err)
287287
}
@@ -293,8 +293,8 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
293293
if len(newService.ObjectMeta.Annotations) > 0 {
294294
annotationsPatchData := metadataAnnotationsPatch(newService.ObjectMeta.Annotations)
295295

296-
_, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
297-
c.Service[role].Name,
296+
_, err := c.KubeClient.Services(c.Services[role].Namespace).Patch(
297+
c.Services[role].Name,
298298
types.StrategicMergePatchType,
299299
[]byte(annotationsPatchData), "")
300300

@@ -308,30 +308,30 @@ func (c *Cluster) updateService(role postgresRole, newService *v1.Service) error
308308
return fmt.Errorf("could not form patch for the service %q: %v", serviceName, err)
309309
}
310310

311-
svc, err := c.KubeClient.Services(c.Service[role].Namespace).Patch(
312-
c.Service[role].Name,
311+
svc, err := c.KubeClient.Services(c.Services[role].Namespace).Patch(
312+
c.Services[role].Name,
313313
types.MergePatchType,
314314
patchData, "")
315315
if err != nil {
316316
return fmt.Errorf("could not patch service %q: %v", serviceName, err)
317317
}
318-
c.Service[role] = svc
318+
c.Services[role] = svc
319319

320320
return nil
321321
}
322322

323323
func (c *Cluster) deleteService(role postgresRole) error {
324324
c.logger.Debugf("deleting service %s", role)
325-
if c.Service[role] == nil {
325+
if c.Services[role] == nil {
326326
return fmt.Errorf("there is no %s service in the cluster", role)
327327
}
328-
service := c.Service[role]
328+
service := c.Services[role]
329329
err := c.KubeClient.Services(service.Namespace).Delete(service.Name, c.deleteOptions)
330330
if err != nil {
331331
return err
332332
}
333333
c.logger.Infof("%s service %q has been deleted", role, util.NameFromMeta(service.ObjectMeta))
334-
c.Service[role] = nil
334+
c.Services[role] = nil
335335
return nil
336336
}
337337

@@ -372,9 +372,9 @@ func (c *Cluster) applySecrets() error {
372372
secret, err := c.KubeClient.Secrets(secretSpec.Namespace).Create(secretSpec)
373373
if k8sutil.ResourceAlreadyExists(err) {
374374
var userMap map[string]spec.PgUser
375-
curSecret, err := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
376-
if err != nil {
377-
return fmt.Errorf("could not get current secret: %v", err)
375+
curSecret, err2 := c.KubeClient.Secrets(secretSpec.Namespace).Get(secretSpec.Name, metav1.GetOptions{})
376+
if err2 != nil {
377+
return fmt.Errorf("could not get current secret: %v", err2)
378378
}
379379
c.logger.Debugf("secret %q already exists, fetching it's password", util.NameFromMeta(curSecret.ObjectMeta))
380380
if secretUsername == c.systemUsers[constants.SuperuserKeyName].Name {
@@ -422,12 +422,12 @@ func (c *Cluster) createRoles() (err error) {
422422

423423
// GetServiceMaster returns cluster's kubernetes master Service
424424
func (c *Cluster) GetServiceMaster() *v1.Service {
425-
return c.Service[master]
425+
return c.Services[master]
426426
}
427427

428428
// GetServiceReplica returns cluster's kubernetes replica Service
429429
func (c *Cluster) GetServiceReplica() *v1.Service {
430-
return c.Service[replica]
430+
return c.Services[replica]
431431
}
432432

433433
// GetEndpoint returns cluster's kubernetes Endpoint

pkg/cluster/sync.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (c *Cluster) Sync() error {
4343
c.logger.Debugf("syncing services")
4444
for _, role := range []postgresRole{master, replica} {
4545
if role == replica && !c.Spec.ReplicaLoadBalancer {
46-
if c.Service[role] != nil {
46+
if c.Services[role] != nil {
4747
// delete the left over replica service
4848
if err := c.deleteService(role); err != nil {
4949
return fmt.Errorf("could not delete obsolete %s service: %v", role, err)
@@ -82,7 +82,7 @@ func (c *Cluster) Sync() error {
8282

8383
func (c *Cluster) syncService(role postgresRole) error {
8484
cSpec := c.Spec
85-
if c.Service[role] == nil {
85+
if c.Services[role] == nil {
8686
c.logger.Infof("could not find the cluster's %s service", role)
8787
svc, err := c.createService(role)
8888
if err != nil {
@@ -98,7 +98,7 @@ func (c *Cluster) syncService(role postgresRole) error {
9898
if match {
9999
return nil
100100
}
101-
c.logServiceChanges(role, c.Service[role], desiredSvc, false, reason)
101+
c.logServiceChanges(role, c.Services[role], desiredSvc, false, reason)
102102

103103
if err := c.updateService(role, desiredSvc); err != nil {
104104
return fmt.Errorf("could not update %s service to match desired state: %v", role, err)

pkg/cluster/util.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,13 @@ func (c *Cluster) waitPodLabelsReady() error {
235235

236236
err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout,
237237
func() (bool, error) {
238-
masterPods, err := c.KubeClient.Pods(namespace).List(masterListOption)
239-
if err != nil {
240-
return false, err
238+
masterPods, err2 := c.KubeClient.Pods(namespace).List(masterListOption)
239+
if err2 != nil {
240+
return false, err2
241241
}
242-
replicaPods, err := c.KubeClient.Pods(namespace).List(replicaListOption)
243-
if err != nil {
244-
return false, err
242+
replicaPods, err2 := c.KubeClient.Pods(namespace).List(replicaListOption)
243+
if err2 != nil {
244+
return false, err2
245245
}
246246
if len(masterPods.Items) > 1 {
247247
return false, fmt.Errorf("too many masters")

pkg/controller/postgresql.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
142142
return cl
143143
}
144144

145-
func (c *Controller) processEvent(obj interface{}) error {
145+
func (c *Controller) processEvent(event spec.ClusterEvent) {
146146
var clusterName spec.NamespacedName
147147

148-
event, ok := obj.(spec.ClusterEvent)
149-
if !ok {
150-
return fmt.Errorf("could not cast to ClusterEvent")
151-
}
152148
lg := c.logger.WithField("worker", event.WorkerID)
153149

154150
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
@@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error {
166162
case spec.EventAdd:
167163
if clusterFound {
168164
lg.Debugf("cluster already exists")
169-
return nil
165+
return
170166
}
171167

172168
lg.Infof("creation of the cluster started")
@@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error {
177173
cl.Error = fmt.Errorf("could not create cluster: %v", err)
178174
lg.Error(cl.Error)
179175

180-
return nil
176+
return
181177
}
182178

183179
lg.Infoln("cluster has been created")
@@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error {
186182

187183
if !clusterFound {
188184
lg.Warnln("cluster does not exist")
189-
return nil
185+
return
190186
}
191187
if err := cl.Update(event.NewSpec); err != nil {
192188
cl.Error = fmt.Errorf("could not update cluster: %v", err)
193189
lg.Error(cl.Error)
194190

195-
return nil
191+
return
196192
}
197193
cl.Error = nil
198194
lg.Infoln("cluster has been updated")
@@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error {
202198
lg.Infoln("Deletion of the cluster started")
203199
if !clusterFound {
204200
lg.Errorf("unknown cluster: %q", clusterName)
205-
return nil
201+
return
206202
}
207203

208204
if err := cl.Delete(); err != nil {
209205
lg.Errorf("could not delete cluster: %v", err)
210-
return nil
206+
return
211207
}
212208

213209
func() {
@@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error {
238234
if err := cl.Sync(); err != nil {
239235
cl.Error = fmt.Errorf("could not sync cluster: %v", err)
240236
lg.Error(cl.Error)
241-
return nil
237+
return
242238
}
243239
cl.Error = nil
244240

245241
lg.Infof("cluster has been synced")
246242
}
247-
248-
return nil
249243
}
250244

251245
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
@@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
257251
}()
258252

259253
for {
260-
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
254+
obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}) error { return nil }))
255+
if err != nil {
261256
if err == cache.FIFOClosedError {
262257
return
263258
}
264-
265259
c.logger.Errorf("error when processing cluster events queue: %v", err)
260+
continue
261+
}
262+
event, ok := obj.(spec.ClusterEvent)
263+
if !ok {
264+
c.logger.Errorf("could not cast to ClusterEvent")
266265
}
266+
267+
c.processEvent(event)
267268
}
268269
}
269270

pkg/util/util.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import (
44
"crypto/md5"
55
"encoding/hex"
66
"math/rand"
7+
"regexp"
78
"strings"
89
"time"
910

1011
"github.com/motomux/pretty"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213

13-
"regexp"
14-
1514
"github.com/zalando-incubator/postgres-operator/pkg/spec"
1615
)
1716

0 commit comments

Comments
 (0)