Skip to content

Commit

Permalink
agent/dispatcher: Add request(ish) message metrics (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff authored Aug 24, 2023
1 parent 286b2be commit 3a64859
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 24 deletions.
111 changes: 90 additions & 21 deletions pkg/agent/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Dispatcher struct {
// to it so that it knows when a response is back. When it receives a message
// with the same transaction id, it knows that that is the repsonse to the original
// message and will send it down the SignalSender so the original sender can use it.
waiters map[uint64]util.SignalSender[*MonitorResult]
waiters map[uint64]util.SignalSender[waiterResult]

// lock guards mutating the waiters field. conn, logger, and nextTransactionID
// are all thread safe. server and protoVersion are never modified.
Expand All @@ -68,6 +68,11 @@ type Dispatcher struct {
protoVersion api.MonitorProtoVersion
}

type waiterResult struct {
err error
res *MonitorResult
}

// Create a new Dispatcher, establishing a connection with the informant.
// Note that this does not immediately start the Dispatcher. Call Run() to start it.
func NewDispatcher(
Expand Down Expand Up @@ -117,7 +122,7 @@ func NewDispatcher(

disp := &Dispatcher{
conn: c,
waiters: make(map[uint64]util.SignalSender[*MonitorResult]),
waiters: make(map[uint64]util.SignalSender[waiterResult]),
lastTransactionID: atomic.Uint64{}, // Note: initialized to 0, so it's even, as required.
logger: logger.Named("dispatcher"),
protoVersion: version.Version,
Expand All @@ -144,7 +149,7 @@ func (disp *Dispatcher) send(ctx context.Context, id uint64, message any) error

// registerWaiter registers a util.SignalSender to get notified when a
// message with the given id arrives.
func (disp *Dispatcher) registerWaiter(id uint64, sender util.SignalSender[*MonitorResult]) {
func (disp *Dispatcher) registerWaiter(id uint64, sender util.SignalSender[waiterResult]) {
disp.lock.Lock()
defer disp.lock.Unlock()
disp.waiters[id] = sender
Expand All @@ -159,9 +164,19 @@ func (disp *Dispatcher) unregisterWaiter(id uint64) {

// Make a request to the monitor and wait for a response. The value passed as message must be a
// valid value to send to the monitor. See the docs for SerializeInformantMessage for more.
func (disp *Dispatcher) Call(ctx context.Context, timeout time.Duration, message any) (*MonitorResult, error) {
func (disp *Dispatcher) Call(
ctx context.Context,
timeout time.Duration,
messageType string,
message any,
) (*MonitorResult, error) {
id := disp.lastTransactionID.Add(2)
sender, receiver := util.NewSingleSignalPair[*MonitorResult]()
sender, receiver := util.NewSingleSignalPair[waiterResult]()

status := "internal error"
defer func() {
disp.server.runner.global.metrics.informantRequestsOutbound.WithLabelValues(messageType, status).Inc()
}()

// register the waiter *before* sending, so that we avoid a potential race where we'd get a
// reply to the message before being ready to receive it.
Expand All @@ -170,6 +185,7 @@ func (disp *Dispatcher) Call(ctx context.Context, timeout time.Duration, message
if err != nil {
disp.logger.Error("failed to send message", zap.Any("message", message), zap.Error(err))
disp.unregisterWaiter(id)
status = "[error: failed to send]"
return nil, err
}

Expand All @@ -178,13 +194,17 @@ func (disp *Dispatcher) Call(ctx context.Context, timeout time.Duration, message

select {
case result := <-receiver.Recv():
if result == nil {
if result.err != nil {
status = fmt.Sprintf("[error: %s]", result.err)
return nil, errors.New("monitor experienced an internal error")
}
return result, nil

status = "ok"
return result.res, nil
case <-timer.C:
err := fmt.Errorf("timed out waiting %v for monitor response", timeout)
disp.unregisterWaiter(id)
status = "[error: timed out waiting for response]"
return nil, err
}
}
Expand Down Expand Up @@ -250,20 +270,51 @@ func (disp *Dispatcher) HandleMessage(
return fmt.Errorf("error extracting 'id field: %w", err)
}
id := uint64(*f)

var rootErr error

// now that we have the waiter's ID, make sure that if there's some failure past this point, we
// propagate that along to the monitor and remove it
defer func() {
// speculatively determine the root error, to send that along to the instance of Call
// waiting for it.
var err error

panicPayload := recover()
if panicPayload != nil {
err = errors.New("panicked")
} else if rootErr != nil {
err = rootErr
} else {
// if HandleMessage bailed without panicking or setting rootErr, but *also* without
// sending a message to the waiter, we should make sure that *something* gets sent, so
// the message doesn't just time out. But we don't have more information, so the error
// is still just "unknown".
err = errors.New("unknown")
}

disp.lock.Lock()
defer disp.lock.Unlock()
if sender, ok := disp.waiters[id]; ok {
sender.Send(nil) // nil for failure
sender.Send(waiterResult{err: err, res: nil})
delete(disp.waiters, id)
} else if rootErr != nil {
// we had some error while handling the message with this ID, and there wasn't a
// corresponding waiter. We should make note of this in the metrics:
status := fmt.Sprintf("[error: %s]", rootErr)
disp.server.runner.global.metrics.informantRequestsInbound.WithLabelValues(*typeStr, status)
}

// resume panicking if we were before
if panicPayload != nil {
panic(panicPayload)
}
}()

// Helper function to handle common unmarshalling logic
unmarshal := func(value any) error {
if err := json.Unmarshal(message, value); err != nil {
rootErr = errors.New("failed unmarshaling JSON")
err := fmt.Errorf("error unmarshaling %s: %w", *typeStr, err)
// we're already on the error path anyways
_ = disp.send(ctx, id, api.InvalidMessage{Error: err.Error()})
Expand Down Expand Up @@ -313,6 +364,7 @@ func (disp *Dispatcher) HandleMessage(
disp.logger.Warn("received notification we sent an invalid message", zap.Any("warning", warning))
return nil
default:
rootErr = errors.New("received unknown message type")
return disp.send(
ctx,
id,
Expand Down Expand Up @@ -342,6 +394,11 @@ func (disp *Dispatcher) run(ctx context.Context) {
disp.server.runner.lock.Lock()
defer disp.server.runner.lock.Unlock()

// TODO: it shouldn't be this function's responsibility to update metrics.
defer func() {
disp.server.runner.global.metrics.informantRequestsInbound.WithLabelValues("UpscaleRequest", "ok")
}()

disp.server.upscaleRequested.Send()

resourceReq := api.MoreResources{
Expand All @@ -363,10 +420,13 @@ func (disp *Dispatcher) run(ctx context.Context) {
sender, ok := disp.waiters[id]
if ok {
logger.Info("monitor confirmed upscale", zap.Uint64("id", id))
sender.Send(&MonitorResult{
Confirmation: &api.UpscaleConfirmation{},
Result: nil,
HealthCheck: nil,
sender.Send(waiterResult{
err: nil,
res: &MonitorResult{
Confirmation: &api.UpscaleConfirmation{},
Result: nil,
HealthCheck: nil,
},
})
// Don't forget to delete the waiter
delete(disp.waiters, id)
Expand All @@ -382,10 +442,13 @@ func (disp *Dispatcher) run(ctx context.Context) {
sender, ok := disp.waiters[id]
if ok {
logger.Info("monitor returned downscale result", zap.Uint64("id", id))
sender.Send(&MonitorResult{
Result: &res,
Confirmation: nil,
HealthCheck: nil,
sender.Send(waiterResult{
err: nil,
res: &MonitorResult{
Result: &res,
Confirmation: nil,
HealthCheck: nil,
},
})
// Don't forget to delete the waiter
delete(disp.waiters, id)
Expand All @@ -406,7 +469,10 @@ func (disp *Dispatcher) run(ctx context.Context) {
zap.Uint64("id", id),
)
// Indicate to the receiver that an error occured
sender.Send(nil)
sender.Send(waiterResult{
err: errors.New("monitor internal error"),
res: nil,
})
// Don't forget to delete the waiter
delete(disp.waiters, id)
return nil
Expand All @@ -422,10 +488,13 @@ func (disp *Dispatcher) run(ctx context.Context) {
if ok {
logger.Info("monitor responded to health check", zap.Uint64("id", id))
// Indicate to the receiver that an error occured
sender.Send(&MonitorResult{
HealthCheck: &api.HealthCheck{},
Result: nil,
Confirmation: nil,
sender.Send(waiterResult{
err: nil,
res: &MonitorResult{
HealthCheck: &api.HealthCheck{},
Result: nil,
Confirmation: nil,
},
})
// Don't forget to delete the waiter
delete(disp.waiters, id)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/informant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ func (s *InformantServer) Upscale(ctx context.Context, logger *zap.Logger, to ap
func (s *InformantServer) MonitorHealthCheck(ctx context.Context, logger *zap.Logger) error {
timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds)

_, err := s.dispatcher.Call(ctx, timeout, api.HealthCheck{})
_, err := s.dispatcher.Call(ctx, timeout, "HealthCheck", api.HealthCheck{})

s.runner.lock.Lock()
defer s.runner.lock.Unlock()
Expand Down Expand Up @@ -1197,7 +1197,7 @@ func (s *InformantServer) MonitorUpscale(ctx context.Context, logger *zap.Logger

timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds)

_, err := s.dispatcher.Call(ctx, timeout, api.UpscaleNotification{
_, err := s.dispatcher.Call(ctx, timeout, "UpscaleNotification", api.UpscaleNotification{
Granted: api.Allocation{Cpu: cpu, Mem: mem},
})
if err != nil {
Expand Down Expand Up @@ -1227,7 +1227,7 @@ func (s *InformantServer) MonitorDownscale(ctx context.Context, logger *zap.Logg

timeout := time.Second * time.Duration(s.runner.global.config.Monitor.ResponseTimeoutSeconds)

res, err := s.dispatcher.Call(ctx, timeout, api.DownscaleRequest{
res, err := s.dispatcher.Call(ctx, timeout, "DownscaleRequest", api.DownscaleRequest{
Target: api.Allocation{Cpu: cpu, Mem: mem},
})
if err != nil {
Expand Down

0 comments on commit 3a64859

Please sign in to comment.