Skip to content

Commit 83760eb

Browse files
committed
discard cluster events from the queue on cluster delete;
delete cluster from the clusters map before deleting cluster itself
1 parent f2c2302 commit 83760eb

File tree

1 file changed

+32
-11
lines changed

1 file changed

+32
-11
lines changed

pkg/controller/postgresql.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
193193
cl.Error = nil
194194
lg.Infoln("cluster has been updated")
195195
case spec.EventDelete:
196-
teamName := strings.ToLower(cl.Spec.TeamID)
197-
198-
lg.Infoln("Deletion of the cluster started")
199196
if !clusterFound {
200197
lg.Errorf("unknown cluster: %q", clusterName)
201198
return
202199
}
200+
lg.Infoln("deletion of the cluster started")
203201

204-
if err := cl.Delete(); err != nil {
205-
lg.Errorf("could not delete cluster: %v", err)
206-
return
207-
}
208-
202+
teamName := strings.ToLower(cl.Spec.TeamID)
209203
func() {
210204
defer c.clustersMu.Unlock()
211205
c.clustersMu.Lock()
212206

213207
delete(c.clusters, clusterName)
214208
delete(c.clusterLogs, clusterName)
215-
for i, val := range c.teamClusters[teamName] { // on relativel
209+
for i, val := range c.teamClusters[teamName] {
216210
if val == clusterName {
217211
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
218212
c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
@@ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
222216
}
223217
}()
224218

219+
if err := cl.Delete(); err != nil {
220+
lg.Errorf("could not delete cluster: %v", err)
221+
return
222+
}
223+
225224
lg.Infof("cluster has been deleted")
226225
case spec.EventSync:
227226
lg.Infof("syncing of the cluster started")
@@ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
305304
NewSpec: new,
306305
WorkerID: workerID,
307306
}
308-
//TODO: if we delete cluster, discard all the previous events for the cluster
309307

310308
lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
311309
if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
312-
lg.Errorf("error when queueing cluster event: %v", clusterEvent)
310+
lg.Errorf("error while queueing cluster event: %v", clusterEvent)
313311
}
314312
lg.Infof("%q event has been queued", eventType)
313+
314+
if eventType != spec.EventDelete {
315+
return
316+
}
317+
318+
for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} {
319+
obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
320+
if err != nil {
321+
lg.Warningf("could not get event from the queue: %v", err)
322+
continue
323+
}
324+
325+
if !exists {
326+
continue
327+
}
328+
329+
err = c.clusterEventQueues[workerID].Delete(obj)
330+
if err != nil {
331+
lg.Warningf("could not delete event from the queue: %v", err)
332+
} else {
333+
lg.Debugf("event %q has been discarded for the cluster", evType)
334+
}
335+
}
315336
}
316337

317338
func (c *Controller) postgresqlAdd(obj interface{}) {

0 commit comments

Comments
 (0)