Skip to content

Commit a49f684

Browse files
committed
update tests for new typed interfaces
Signed-off-by: Eitan Yarmush <eitan.yarmush@solo.io>
1 parent 1bf2e76 commit a49f684

File tree

11 files changed

+188
-213
lines changed

11 files changed

+188
-213
lines changed

internal/client/kagent_client.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package client
33
import (
44
"context"
55
"fmt"
6-
"reflect"
76
"strings"
87
"time"
98

@@ -202,19 +201,7 @@ func (c *Client) CallAgent(ctx context.Context, request interfaces.AgentRequest)
202201
return nil, fmt.Errorf("failed to send A2A message: %w", err)
203202
}
204203

205-
// Best-effort check whether a Task was returned (per A2A Life of a Task)
206-
isTask := false
207-
if res != nil {
208-
rv := reflect.ValueOf(res)
209-
if rv.Kind() == reflect.Ptr {
210-
rv = rv.Elem()
211-
}
212-
if rv.IsValid() {
213-
if f := rv.FieldByName("Task"); f.IsValid() && !f.IsZero() {
214-
isTask = true
215-
}
216-
}
217-
}
204+
_, isTask := res.Result.(*protocol.Task)
218205

219206
c.logger.Info("Agent accepted message via A2A",
220207
"agentRef", request.AgentRef.String(),

internal/deduplication/manager.go

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/kagent-dev/khook/internal/interfaces"
9+
"k8s.io/apimachinery/pkg/types"
910
"sigs.k8s.io/controller-runtime/pkg/log"
1011
)
1112

@@ -43,12 +44,12 @@ func (m *Manager) eventKey(event interfaces.Event) string {
4344
}
4445

4546
// ShouldProcessEvent determines if an event should be processed based on deduplication logic
46-
func (m *Manager) ShouldProcessEvent(hookName string, event interfaces.Event) bool {
47-
logger := log.Log.WithName("dedup").WithValues("hook", hookName, "eventType", event.Type, "resource", event.ResourceName)
47+
func (m *Manager) ShouldProcessEvent(hookRef types.NamespacedName, event interfaces.Event) bool {
48+
logger := log.Log.WithName("dedup").WithValues("hook", hookRef.String(), "eventType", event.Type, "resource", event.ResourceName)
4849
m.mutex.RLock()
4950
defer m.mutex.RUnlock()
5051

51-
hookEventMap, exists := m.hookEvents[hookName]
52+
hookEventMap, exists := m.hookEvents[hookRef.String()]
5253
if !exists {
5354
// No events for this hook, should process
5455
logger.V(1).Info("No existing events for hook; will process")
@@ -83,28 +84,28 @@ func (m *Manager) ShouldProcessEvent(hookName string, event interfaces.Event) bo
8384
}
8485

8586
// RecordEvent records an event in the deduplication storage
86-
func (m *Manager) RecordEvent(hookName string, event interfaces.Event) error {
87-
logger := log.Log.WithName("dedup").WithValues("hook", hookName, "eventType", event.Type, "resource", event.ResourceName)
87+
func (m *Manager) RecordEvent(hookRef types.NamespacedName, event interfaces.Event) error {
88+
logger := log.Log.WithName("dedup").WithValues("hook", hookRef.String(), "eventType", event.Type, "resource", event.ResourceName)
8889
m.mutex.Lock()
8990
defer m.mutex.Unlock()
9091

9192
// Initialize hook event map if it doesn't exist
92-
if m.hookEvents[hookName] == nil {
93-
m.hookEvents[hookName] = make(map[string]*interfaces.ActiveEvent)
93+
if m.hookEvents[hookRef.String()] == nil {
94+
m.hookEvents[hookRef.String()] = make(map[string]*interfaces.ActiveEvent)
9495
}
9596

9697
key := m.eventKey(event)
9798
now := time.Now()
9899

99100
// Check if event already exists
100-
if existingEvent, exists := m.hookEvents[hookName][key]; exists {
101+
if existingEvent, exists := m.hookEvents[hookRef.String()][key]; exists {
101102
// Update existing event
102103
existingEvent.LastSeen = now
103104
existingEvent.Status = StatusFiring
104105
logger.V(1).Info("Updated existing active event", "lastSeen", existingEvent.LastSeen)
105106
} else {
106107
// Create new event record
107-
m.hookEvents[hookName][key] = &interfaces.ActiveEvent{
108+
m.hookEvents[hookRef.String()][key] = &interfaces.ActiveEvent{
108109
EventType: event.Type,
109110
ResourceName: event.ResourceName,
110111
FirstSeen: now,
@@ -118,21 +119,21 @@ func (m *Manager) RecordEvent(hookName string, event interfaces.Event) error {
118119
}
119120

120121
// MarkNotified marks that we successfully notified the agent for this event now
121-
func (m *Manager) MarkNotified(hookName string, event interfaces.Event) {
122+
func (m *Manager) MarkNotified(hookRef types.NamespacedName, event interfaces.Event) {
122123
m.mutex.Lock()
123124
defer m.mutex.Unlock()
124-
if m.hookEvents[hookName] == nil {
125-
m.hookEvents[hookName] = make(map[string]*interfaces.ActiveEvent)
125+
if m.hookEvents[hookRef.String()] == nil {
126+
m.hookEvents[hookRef.String()] = make(map[string]*interfaces.ActiveEvent)
126127
}
127128
key := m.eventKey(event)
128129
now := time.Now()
129-
if ae, ok := m.hookEvents[hookName][key]; ok {
130+
if ae, ok := m.hookEvents[hookRef.String()][key]; ok {
130131
ae.LastNotifiedAt = &now
131132
if ae.NotifiedAt == nil {
132133
ae.NotifiedAt = &now
133134
}
134135
} else {
135-
m.hookEvents[hookName][key] = &interfaces.ActiveEvent{
136+
m.hookEvents[hookRef.String()][key] = &interfaces.ActiveEvent{
136137
EventType: event.Type,
137138
ResourceName: event.ResourceName,
138139
FirstSeen: now,
@@ -145,11 +146,11 @@ func (m *Manager) MarkNotified(hookName string, event interfaces.Event) {
145146
}
146147

147148
// CleanupExpiredEvents removes events that have exceeded the timeout duration
148-
func (m *Manager) CleanupExpiredEvents(hookName string) error {
149+
func (m *Manager) CleanupExpiredEvents(hookRef types.NamespacedName) error {
149150
m.mutex.Lock()
150151
defer m.mutex.Unlock()
151152

152-
hookEventMap, exists := m.hookEvents[hookName]
153+
hookEventMap, exists := m.hookEvents[hookRef.String()]
153154
if !exists {
154155
// No events for this hook
155156
return nil
@@ -174,18 +175,18 @@ func (m *Manager) CleanupExpiredEvents(hookName string) error {
174175

175176
// Clean up empty hook map
176177
if len(hookEventMap) == 0 {
177-
delete(m.hookEvents, hookName)
178+
delete(m.hookEvents, hookRef.String())
178179
}
179180

180181
return nil
181182
}
182183

183184
// GetActiveEvents returns all active events for a specific hook
184-
func (m *Manager) GetActiveEvents(hookName string) []interfaces.ActiveEvent {
185+
func (m *Manager) GetActiveEvents(hookRef types.NamespacedName) []interfaces.ActiveEvent {
185186
m.mutex.RLock()
186187
defer m.mutex.RUnlock()
187188

188-
hookEventMap, exists := m.hookEvents[hookName]
189+
hookEventMap, exists := m.hookEvents[hookRef.String()]
189190
if !exists {
190191
return []interfaces.ActiveEvent{}
191192
}
@@ -204,8 +205,8 @@ func (m *Manager) GetActiveEvents(hookName string) []interfaces.ActiveEvent {
204205

205206
// GetActiveEventsWithStatus returns all active events with their current status
206207
// This method handles status calculation without race conditions
207-
func (m *Manager) GetActiveEventsWithStatus(hookName string) []interfaces.ActiveEvent {
208-
activeEvents := m.GetActiveEvents(hookName)
208+
func (m *Manager) GetActiveEventsWithStatus(hookRef types.NamespacedName) []interfaces.ActiveEvent {
209+
activeEvents := m.GetActiveEvents(hookRef)
209210

210211
now := time.Now()
211212
for i := range activeEvents {

0 commit comments

Comments
 (0)