-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathmanager.go
More file actions
246 lines (204 loc) · 7.21 KB
/
manager.go
File metadata and controls
246 lines (204 loc) · 7.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package deduplication
import (
"fmt"
"sync"
"time"
"github.com/kagent-dev/khook/internal/interfaces"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
// EventTimeoutDuration is the duration after which events are considered resolved
EventTimeoutDuration = 10 * time.Minute
// NotificationSuppressionDuration is the window to suppress re-sending after success
NotificationSuppressionDuration = 30 * time.Second
// StatusFiring indicates an event is currently active
StatusFiring = "firing"
// StatusResolved indicates an event has been resolved (timed out)
StatusResolved = "resolved"
)
// Manager implements the DeduplicationManager interface with in-memory storage
type Manager struct {
// hookEvents maps hook names to their active events
// hookName -> eventKey -> ActiveEvent
hookEvents map[string]map[string]*interfaces.ActiveEvent
mutex sync.RWMutex
}
// NewManager creates a new DeduplicationManager instance
func NewManager() *Manager {
return &Manager{
hookEvents: make(map[string]map[string]*interfaces.ActiveEvent),
}
}
// eventKey generates a unique key for an event based on type and resource
func (m *Manager) eventKey(event interfaces.Event) string {
return fmt.Sprintf("%s:%s:%s", event.Type, event.Namespace, event.ResourceName)
}
// ShouldProcessEvent determines if an event should be processed based on deduplication logic
func (m *Manager) ShouldProcessEvent(hookRef types.NamespacedName, event interfaces.Event) bool {
logger := log.Log.WithName("dedup").WithValues("hook", hookRef.String(), "eventType", event.Type, "resource", event.ResourceName)
m.mutex.RLock()
defer m.mutex.RUnlock()
hookEventMap, exists := m.hookEvents[hookRef.String()]
if !exists {
// No events for this hook, should process
logger.V(1).Info("No existing events for hook; will process")
return true
}
key := m.eventKey(event)
activeEvent, exists := hookEventMap[key]
if !exists {
// Event doesn't exist, should process
logger.V(1).Info("First occurrence of event; will process")
return true
}
// Suppress if we recently notified and within suppression window
if activeEvent.LastNotifiedAt != nil && time.Since(*activeEvent.LastNotifiedAt) < NotificationSuppressionDuration {
logger.V(1).Info("Within notification suppression window; will ignore",
"lastNotifiedAt", *activeEvent.LastNotifiedAt)
return false
}
// Check if event has expired (more than 10 minutes old)
if time.Since(activeEvent.FirstSeen) > EventTimeoutDuration {
// Event has expired, should process as new event
logger.V(1).Info("Event expired; will process as new", "firstSeen", activeEvent.FirstSeen)
return true
}
// Event is still active within timeout window, should not process
logger.V(2).Info("Duplicate within timeout; will ignore", "firstSeen", activeEvent.FirstSeen)
return false
}
// RecordEvent records an event in the deduplication storage
func (m *Manager) RecordEvent(hookRef types.NamespacedName, event interfaces.Event) error {
logger := log.Log.WithName("dedup").WithValues("hook", hookRef.String(), "eventType", event.Type, "resource", event.ResourceName)
m.mutex.Lock()
defer m.mutex.Unlock()
// Initialize hook event map if it doesn't exist
if m.hookEvents[hookRef.String()] == nil {
m.hookEvents[hookRef.String()] = make(map[string]*interfaces.ActiveEvent)
}
key := m.eventKey(event)
now := time.Now()
// Check if event already exists
if existingEvent, exists := m.hookEvents[hookRef.String()][key]; exists {
// Update existing event
existingEvent.LastSeen = now
existingEvent.Status = StatusFiring
logger.V(1).Info("Updated existing active event", "lastSeen", existingEvent.LastSeen)
} else {
// Create new event record
m.hookEvents[hookRef.String()][key] = &interfaces.ActiveEvent{
EventType: event.Type,
ResourceName: event.ResourceName,
FirstSeen: now,
LastSeen: now,
Status: StatusFiring,
}
logger.Info("Recorded new active event", "firstSeen", now)
}
return nil
}
// MarkNotified marks that we successfully notified the agent for this event now
func (m *Manager) MarkNotified(hookRef types.NamespacedName, event interfaces.Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.hookEvents[hookRef.String()] == nil {
m.hookEvents[hookRef.String()] = make(map[string]*interfaces.ActiveEvent)
}
key := m.eventKey(event)
now := time.Now()
if ae, ok := m.hookEvents[hookRef.String()][key]; ok {
ae.LastNotifiedAt = &now
if ae.NotifiedAt == nil {
ae.NotifiedAt = &now
}
} else {
m.hookEvents[hookRef.String()][key] = &interfaces.ActiveEvent{
EventType: event.Type,
ResourceName: event.ResourceName,
FirstSeen: now,
LastSeen: now,
Status: StatusFiring,
NotifiedAt: &now,
LastNotifiedAt: &now,
}
}
}
// CleanupExpiredEvents removes events that have exceeded the timeout duration
func (m *Manager) CleanupExpiredEvents(hookRef types.NamespacedName) error {
m.mutex.Lock()
defer m.mutex.Unlock()
hookEventMap, exists := m.hookEvents[hookRef.String()]
if !exists {
// No events for this hook
return nil
}
now := time.Now()
expiredKeys := make([]string, 0)
// Find expired events
for key, activeEvent := range hookEventMap {
if now.Sub(activeEvent.FirstSeen) > EventTimeoutDuration {
// Mark as resolved before removal
activeEvent.Status = StatusResolved
expiredKeys = append(expiredKeys, key)
}
}
// Remove expired events
for _, key := range expiredKeys {
delete(hookEventMap, key)
}
// Clean up empty hook map
if len(hookEventMap) == 0 {
delete(m.hookEvents, hookRef.String())
}
return nil
}
// GetActiveEvents returns all active events for a specific hook
func (m *Manager) GetActiveEvents(hookRef types.NamespacedName) []interfaces.ActiveEvent {
m.mutex.RLock()
defer m.mutex.RUnlock()
hookEventMap, exists := m.hookEvents[hookRef.String()]
if !exists {
return []interfaces.ActiveEvent{}
}
activeEvents := make([]interfaces.ActiveEvent, 0, len(hookEventMap))
for _, activeEvent := range hookEventMap {
// Create a copy to avoid returning pointers to internal data
eventCopy := *activeEvent
activeEvents = append(activeEvents, eventCopy)
}
return activeEvents
}
// GetActiveEventsWithStatus returns all active events with their current status
// This method handles status calculation without race conditions
func (m *Manager) GetActiveEventsWithStatus(hookRef types.NamespacedName) []interfaces.ActiveEvent {
activeEvents := m.GetActiveEvents(hookRef)
now := time.Now()
for i := range activeEvents {
// Check if event should be marked as resolved
if now.Sub(activeEvents[i].FirstSeen) > EventTimeoutDuration {
activeEvents[i].Status = StatusResolved
}
}
return activeEvents
}
// GetAllHookNames returns all hook names that have active events
func (m *Manager) GetAllHookNames() []string {
m.mutex.RLock()
defer m.mutex.RUnlock()
hookNames := make([]string, 0, len(m.hookEvents))
for hookName := range m.hookEvents {
hookNames = append(hookNames, hookName)
}
return hookNames
}
// GetEventCount returns the total number of active events across all hooks
func (m *Manager) GetEventCount() int {
m.mutex.RLock()
defer m.mutex.RUnlock()
count := 0
for _, hookEventMap := range m.hookEvents {
count += len(hookEventMap)
}
return count
}