Skip to content

Commit dad8e2f

Browse files
committed
make cluster event queue consumption non-blocking
1 parent d2828e5 commit dad8e2f

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

pkg/controller/postgresql.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,9 @@ func (c *Controller) addCluster(lg *logrus.Entry, clusterName spec.NamespacedNam
142142
return cl
143143
}
144144

145-
func (c *Controller) processEvent(obj interface{}) error {
145+
func (c *Controller) processEvent(event spec.ClusterEvent) {
146146
var clusterName spec.NamespacedName
147147

148-
event, ok := obj.(spec.ClusterEvent)
149-
if !ok {
150-
return fmt.Errorf("could not cast to ClusterEvent")
151-
}
152148
lg := c.logger.WithField("worker", event.WorkerID)
153149

154150
if event.EventType == spec.EventAdd || event.EventType == spec.EventSync {
@@ -166,7 +162,7 @@ func (c *Controller) processEvent(obj interface{}) error {
166162
case spec.EventAdd:
167163
if clusterFound {
168164
lg.Debugf("cluster already exists")
169-
return nil
165+
return
170166
}
171167

172168
lg.Infof("creation of the cluster started")
@@ -177,7 +173,7 @@ func (c *Controller) processEvent(obj interface{}) error {
177173
cl.Error = fmt.Errorf("could not create cluster: %v", err)
178174
lg.Error(cl.Error)
179175

180-
return nil
176+
return
181177
}
182178

183179
lg.Infoln("cluster has been created")
@@ -186,13 +182,13 @@ func (c *Controller) processEvent(obj interface{}) error {
186182

187183
if !clusterFound {
188184
lg.Warnln("cluster does not exist")
189-
return nil
185+
return
190186
}
191187
if err := cl.Update(event.NewSpec); err != nil {
192188
cl.Error = fmt.Errorf("could not update cluster: %v", err)
193189
lg.Error(cl.Error)
194190

195-
return nil
191+
return
196192
}
197193
cl.Error = nil
198194
lg.Infoln("cluster has been updated")
@@ -202,12 +198,12 @@ func (c *Controller) processEvent(obj interface{}) error {
202198
lg.Infoln("Deletion of the cluster started")
203199
if !clusterFound {
204200
lg.Errorf("unknown cluster: %q", clusterName)
205-
return nil
201+
return
206202
}
207203

208204
if err := cl.Delete(); err != nil {
209205
lg.Errorf("could not delete cluster: %v", err)
210-
return nil
206+
return
211207
}
212208

213209
func() {
@@ -238,14 +234,12 @@ func (c *Controller) processEvent(obj interface{}) error {
238234
if err := cl.Sync(); err != nil {
239235
cl.Error = fmt.Errorf("could not sync cluster: %v", err)
240236
lg.Error(cl.Error)
241-
return nil
237+
return
242238
}
243239
cl.Error = nil
244240

245241
lg.Infof("cluster has been synced")
246242
}
247-
248-
return nil
249243
}
250244

251245
func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
@@ -257,13 +251,20 @@ func (c *Controller) processClusterEventsQueue(idx int, stopCh <-chan struct{},
257251
}()
258252

259253
for {
260-
if _, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(c.processEvent)); err != nil {
254+
obj, err := c.clusterEventQueues[idx].Pop(cache.PopProcessFunc(func(interface{}) error { return nil }))
255+
if err != nil {
261256
if err == cache.FIFOClosedError {
262257
return
263258
}
264-
265259
c.logger.Errorf("error when processing cluster events queue: %v", err)
260+
continue
261+
}
262+
event, ok := obj.(spec.ClusterEvent)
263+
if !ok {
264+
c.logger.Errorf("could not cast to ClusterEvent")
266265
}
266+
267+
c.processEvent(event)
267268
}
268269
}
269270

0 commit comments

Comments
 (0)