Skip to content

Commit

Permalink
simplify senders list
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff committed Jan 28, 2025
1 parent b4aed57 commit 864689e
Showing 1 changed file with 5 additions and 15 deletions.
20 changes: 5 additions & 15 deletions pkg/reporting/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,18 @@ type EventSink[E any] struct {
func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients ...Client[E]) *EventSink[E] {
var queueWriters []eventQueuePusher[E]

var senders []struct {
name string
sender eventSender[E]
}
var senders []eventSender[E]

for _, c := range clients {
qw, qr := newEventQueue[E](metrics.queueSizeCurrent.WithLabelValues(c.Name))
queueWriters = append(queueWriters, qw)

// Create the sender -- we'll save starting it for the call to Run()
sender := eventSender[E]{
senders = append(senders, eventSender[E]{
client: c,
metrics: metrics,
queue: qr,
lastSendDuration: 0,
}
senders = append(senders, struct {
name string
sender eventSender[E]
}{
name: c.Name,
sender: sender,
})
}

Expand All @@ -55,10 +45,10 @@ func NewEventSink[E any](logger *zap.Logger, metrics *EventSinkMetrics, clients
runSenders = func(ctx context.Context) {
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))

for _, s := range senders {
taskName := fmt.Sprintf("send-%s", s.name)
for _, sender := range senders {
taskName := fmt.Sprintf("send-%s", sender.client.Name)
tg.Go(taskName, func(logger *zap.Logger) error {
s.sender.senderLoop(tg.Ctx(), logger)
sender.senderLoop(tg.Ctx(), logger)
return nil
})
}
Expand Down

0 comments on commit 864689e

Please sign in to comment.