Skip to content

Commit 5ff6d6a

Browse files
authored
Discard cluster events from the queue on cluster delete
2 parents c557027 + 83760eb commit 5ff6d6a

File tree

4 files changed

+41
-15
lines changed

4 files changed

+41
-15
lines changed

pkg/apiserver/apiserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
9393
s.logger.Fatalf("Could not start http server: %v", err)
9494
}
9595
}()
96-
s.logger.Infof("Listening on %s", s.http.Addr)
96+
s.logger.Infof("listening on %s", s.http.Addr)
9797

9898
<-stopCh
9999

pkg/cluster/cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,12 @@ func (c *Cluster) setStatus(status spec.PostgresStatus) {
132132
DoRaw()
133133

134134
if k8sutil.ResourceNotFound(err) {
135-
c.logger.Warningf("could not set status for the non-existing cluster")
135+
c.logger.Warningf("could not set %q status for the non-existing cluster", status)
136136
return
137137
}
138138

139139
if err != nil {
140-
c.logger.Warningf("could not set status for the cluster: %v", err)
140+
c.logger.Warningf("could not set %q status for the cluster: %v", status, err)
141141
}
142142
}
143143

pkg/controller/controller.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/Sirupsen/logrus"
88
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/types"
910
"k8s.io/client-go/pkg/api/v1"
1011
"k8s.io/client-go/rest"
1112
"k8s.io/client-go/tools/cache"
@@ -169,7 +170,7 @@ func (c *Controller) initController() {
169170
return "", fmt.Errorf("could not cast to ClusterEvent")
170171
}
171172

172-
return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
173+
return queueClusterKey(e.EventType, e.UID), nil
173174
})
174175
}
175176

@@ -206,3 +207,7 @@ func (c *Controller) runPostgresqlInformer(stopCh <-chan struct{}, wg *sync.Wait
206207

207208
c.postgresqlInformer.Run(stopCh)
208209
}
210+
211+
func queueClusterKey(eventType spec.EventType, uid types.UID) string {
212+
return fmt.Sprintf("%s-%s", eventType, uid)
213+
}

pkg/controller/postgresql.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,26 +193,20 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
193193
cl.Error = nil
194194
lg.Infoln("cluster has been updated")
195195
case spec.EventDelete:
196-
teamName := strings.ToLower(cl.Spec.TeamID)
197-
198-
lg.Infoln("Deletion of the cluster started")
199196
if !clusterFound {
200197
lg.Errorf("unknown cluster: %q", clusterName)
201198
return
202199
}
200+
lg.Infoln("deletion of the cluster started")
203201

204-
if err := cl.Delete(); err != nil {
205-
lg.Errorf("could not delete cluster: %v", err)
206-
return
207-
}
208-
202+
teamName := strings.ToLower(cl.Spec.TeamID)
209203
func() {
210204
defer c.clustersMu.Unlock()
211205
c.clustersMu.Lock()
212206

213207
delete(c.clusters, clusterName)
214208
delete(c.clusterLogs, clusterName)
215-
for i, val := range c.teamClusters[teamName] { // on relativel
209+
for i, val := range c.teamClusters[teamName] {
216210
if val == clusterName {
217211
copy(c.teamClusters[teamName][i:], c.teamClusters[teamName][i+1:])
218212
c.teamClusters[teamName][len(c.teamClusters[teamName])-1] = spec.NamespacedName{}
@@ -222,6 +216,11 @@ func (c *Controller) processEvent(event spec.ClusterEvent) {
222216
}
223217
}()
224218

219+
if err := cl.Delete(); err != nil {
220+
lg.Errorf("could not delete cluster: %v", err)
221+
return
222+
}
223+
225224
lg.Infof("cluster has been deleted")
226225
case spec.EventSync:
227226
lg.Infof("syncing of the cluster started")
@@ -305,13 +304,35 @@ func (c *Controller) queueClusterEvent(old, new *spec.Postgresql, eventType spec
305304
NewSpec: new,
306305
WorkerID: workerID,
307306
}
308-
//TODO: if we delete cluster, discard all the previous events for the cluster
309307

310308
lg := c.logger.WithField("worker", workerID).WithField("cluster-name", clusterName)
311309
if err := c.clusterEventQueues[workerID].Add(clusterEvent); err != nil {
312-
lg.Errorf("error when queueing cluster event: %v", clusterEvent)
310+
lg.Errorf("error while queueing cluster event: %v", clusterEvent)
313311
}
314312
lg.Infof("%q event has been queued", eventType)
313+
314+
if eventType != spec.EventDelete {
315+
return
316+
}
317+
318+
for _, evType := range []spec.EventType{spec.EventAdd, spec.EventSync, spec.EventUpdate} {
319+
obj, exists, err := c.clusterEventQueues[workerID].GetByKey(queueClusterKey(evType, uid))
320+
if err != nil {
321+
lg.Warningf("could not get event from the queue: %v", err)
322+
continue
323+
}
324+
325+
if !exists {
326+
continue
327+
}
328+
329+
err = c.clusterEventQueues[workerID].Delete(obj)
330+
if err != nil {
331+
lg.Warningf("could not delete event from the queue: %v", err)
332+
} else {
333+
lg.Debugf("event %q has been discarded for the cluster", evType)
334+
}
335+
}
315336
}
316337

317338
func (c *Controller) postgresqlAdd(obj interface{}) {

0 commit comments

Comments
 (0)