diff --git a/pkg/agent/dispatcher.go b/pkg/agent/dispatcher.go index e0ddc826d..c8b0b005c 100644 --- a/pkg/agent/dispatcher.go +++ b/pkg/agent/dispatcher.go @@ -46,7 +46,7 @@ type Dispatcher struct { // to it so that it knows when a response is back. When it receives a message // with the same transaction id, it knows that that is the repsonse to the original // message and will send it down the SignalSender so the original sender can use it. - waiters map[uint64]util.SignalSender[*MonitorResult] + waiters map[uint64]util.SignalSender[waiterResult] // lock guards mutating the waiters field. conn, logger, and nextTransactionID // are all thread safe. server and protoVersion are never modified. @@ -68,6 +68,11 @@ type Dispatcher struct { protoVersion api.MonitorProtoVersion } +type waiterResult struct { + err error + res *MonitorResult +} + // Create a new Dispatcher, establishing a connection with the informant. // Note that this does not immediately start the Dispatcher. Call Run() to start it. func NewDispatcher( @@ -117,7 +122,7 @@ func NewDispatcher( disp := &Dispatcher{ conn: c, - waiters: make(map[uint64]util.SignalSender[*MonitorResult]), + waiters: make(map[uint64]util.SignalSender[waiterResult]), lastTransactionID: atomic.Uint64{}, // Note: initialized to 0, so it's even, as required. logger: logger.Named("dispatcher"), protoVersion: version.Version, @@ -144,7 +149,7 @@ func (disp *Dispatcher) send(ctx context.Context, id uint64, message any) error // registerWaiter registers a util.SignalSender to get notified when a // message with the given id arrives. -func (disp *Dispatcher) registerWaiter(id uint64, sender util.SignalSender[*MonitorResult]) { +func (disp *Dispatcher) registerWaiter(id uint64, sender util.SignalSender[waiterResult]) { disp.lock.Lock() defer disp.lock.Unlock() disp.waiters[id] = sender @@ -159,9 +164,19 @@ func (disp *Dispatcher) unregisterWaiter(id uint64) { // Make a request to the monitor and wait for a response. The value passed as message must be a // valid value to send to the monitor. See the docs for SerializeInformantMessage for more. -func (disp *Dispatcher) Call(ctx context.Context, timeout time.Duration, message any) (*MonitorResult, error) { +func (disp *Dispatcher) Call( + ctx context.Context, + timeout time.Duration, + messageType string, + message any, +) (*MonitorResult, error) { id := disp.lastTransactionID.Add(2) - sender, receiver := util.NewSingleSignalPair[*MonitorResult]() + sender, receiver := util.NewSingleSignalPair[waiterResult]() + + status := "internal error" + defer func() { + disp.server.runner.global.metrics.informantRequestsOutbound.WithLabelValues(messageType, status).Inc() + }() // register the waiter *before* sending, so that we avoid a potential race where we'd get a // reply to the message before being ready to receive it. @@ -170,6 +185,7 @@ func (disp *Dispatcher) Call(ctx context.Context, timeout time.Duration, message if err != nil { disp.logger.Error("failed to send message", zap.Any("message", message), zap.Error(err)) disp.unregisterWaiter(id) + status = "[error: failed to send]" return nil, err } @@ -178,13 +194,17 @@ func (disp *Dispatcher) Call(ctx context.Context, timeout time.Duration, message select { case result := <-receiver.Recv(): - if result == nil { + if result.err != nil { + status = fmt.Sprintf("[error: %s]", result.err) return nil, errors.New("monitor experienced an internal error") } - return result, nil + + status = "ok" + return result.res, nil case <-timer.C: err := fmt.Errorf("timed out waiting %v for monitor response", timeout) disp.unregisterWaiter(id) + status = "[error: timed out waiting for response]" return nil, err } } @@ -250,20 +270,51 @@ func (disp *Dispatcher) HandleMessage( return fmt.Errorf("error extracting 'id field: %w", err) } id := uint64(*f) + + var rootErr error + // now that we have the waiter's ID, make sure that if there's some failure past this point, we // propagate that along to the monitor and remove it defer func() { + // speculatively determine the root error, to send that along to the instance of Call + // waiting for it. + var err error + + panicPayload := recover() + if panicPayload != nil { + err = errors.New("panicked") + } else if rootErr != nil { + err = rootErr + } else { + // if HandleMessage bailed without panicking or setting rootErr, but *also* without + // sending a message to the waiter, we should make sure that *something* gets sent, so + // the message doesn't just time out. But we don't have more information, so the error + // is still just "unknown". + err = errors.New("unknown") + } + disp.lock.Lock() defer disp.lock.Unlock() if sender, ok := disp.waiters[id]; ok { - sender.Send(nil) // nil for failure + sender.Send(waiterResult{err: err, res: nil}) delete(disp.waiters, id) + } else if rootErr != nil { + // we had some error while handling the message with this ID, and there wasn't a + // corresponding waiter. We should make note of this in the metrics: + status := fmt.Sprintf("[error: %s]", rootErr) + disp.server.runner.global.metrics.informantRequestsInbound.WithLabelValues(*typeStr, status) + } + + // resume panicking if we were before + if panicPayload != nil { + panic(panicPayload) } }() // Helper function to handle common unmarshalling logic unmarshal := func(value any) error { if err := json.Unmarshal(message, value); err != nil { + rootErr = errors.New("failed unmarshaling JSON") err := fmt.Errorf("error unmarshaling %s: %w", *typeStr, err) // we're already on the error path anyways _ = disp.send(ctx, id, api.InvalidMessage{Error: err.Error()}) @@ -313,6 +364,7 @@ func (disp *Dispatcher) HandleMessage( disp.logger.Warn("received notification we sent an invalid message", zap.Any("warning", warning)) return nil default: + rootErr = errors.New("received unknown message type") return disp.send( ctx, id, @@ -342,6 +394,11 @@ func (disp *Dispatcher) run(ctx context.Context) { disp.server.runner.lock.Lock() defer disp.server.runner.lock.Unlock() + // TODO: it shouldn't be this function's responsibility to update metrics. + defer func() { + disp.server.runner.global.metrics.informantRequestsInbound.WithLabelValues("UpscaleRequest", "ok") + }() + disp.server.upscaleRequested.Send() resourceReq := api.MoreResources{ @@ -363,10 +420,13 @@ func (disp *Dispatcher) run(ctx context.Context) { sender, ok := disp.waiters[id] if ok { logger.Info("monitor confirmed upscale", zap.Uint64("id", id)) - sender.Send(&MonitorResult{ - Confirmation: &api.UpscaleConfirmation{}, - Result: nil, - HealthCheck: nil, + sender.Send(waiterResult{ + err: nil, + res: &MonitorResult{ + Confirmation: &api.UpscaleConfirmation{}, + Result: nil, + HealthCheck: nil, + }, }) // Don't forget to delete the waiter delete(disp.waiters, id) @@ -382,10 +442,13 @@ func (disp *Dispatcher) run(ctx context.Context) { sender, ok := disp.waiters[id] if ok { logger.Info("monitor returned downscale result", zap.Uint64("id", id)) - sender.Send(&MonitorResult{ - Result: &res, - Confirmation: nil, - HealthCheck: nil, + sender.Send(waiterResult{ + err: nil, + res: &MonitorResult{ + Result: &res, + Confirmation: nil, + HealthCheck: nil, + }, }) // Don't forget to delete the waiter delete(disp.waiters, id) @@ -406,7 +469,10 @@ func (disp *Dispatcher) run(ctx context.Context) { zap.Uint64("id", id), ) // Indicate to the receiver that an error occured - sender.Send(nil) + sender.Send(waiterResult{ + err: errors.New("monitor internal error"), + res: nil, + }) // Don't forget to delete the waiter delete(disp.waiters, id) return nil @@ -422,10 +488,13 @@ func (disp *Dispatcher) run(ctx context.Context) { if ok { logger.Info("monitor responded to health check", zap.Uint64("id", id)) // Indicate to the receiver that an error occured - sender.Send(&MonitorResult{ - HealthCheck: &api.HealthCheck{}, - Result: nil, - Confirmation: nil, + sender.Send(waiterResult{ + err: nil, + res: &MonitorResult{ + HealthCheck: &api.HealthCheck{}, + Result: nil, + Confirmation: nil, + }, }) // Don't forget to delete the waiter delete(disp.waiters, id) diff --git a/pkg/agent/informant.go b/pkg/agent/informant.go index 37ad5e364..a76269aa0 100644 --- a/pkg/agent/informant.go +++ b/pkg/agent/informant.go @@ -1158,7 +1158,7 @@ func (s *InformantServer) Upscale(ctx context.Context, logger *zap.Logger, to ap func (s *InformantServer) MonitorHealthCheck(ctx context.Context, logger *zap.Logger) error { timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds) - _, err := s.dispatcher.Call(ctx, timeout, api.HealthCheck{}) + _, err := s.dispatcher.Call(ctx, timeout, "HealthCheck", api.HealthCheck{}) s.runner.lock.Lock() defer s.runner.lock.Unlock() @@ -1197,7 +1197,7 @@ func (s *InformantServer) MonitorUpscale(ctx context.Context, logger *zap.Logger timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds) - _, err := s.dispatcher.Call(ctx, timeout, api.UpscaleNotification{ + _, err := s.dispatcher.Call(ctx, timeout, "UpscaleNotification", api.UpscaleNotification{ Granted: api.Allocation{Cpu: cpu, Mem: mem}, }) if err != nil { @@ -1227,7 +1227,7 @@ func (s *InformantServer) MonitorDownscale(ctx context.Context, logger *zap.Logg timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds) - res, err := s.dispatcher.Call(ctx, timeout, api.DownscaleRequest{ + res, err := s.dispatcher.Call(ctx, timeout, "DownscaleRequest", api.DownscaleRequest{ Target: api.Allocation{Cpu: cpu, Mem: mem}, }) if err != nil {