Skip to content

Commit e5697fc

Browse files
authored
RSDK-9876: Add counts for each {component, gRPC method} invocation. (#4765)
1 parent 362f26f commit e5697fc

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

robot/impl/local_robot.go

+3
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ func newWithResources(
458458
// we assume these never appear in our configs and as such will not be removed from the
459459
// resource graph
460460
r.webSvc = web.New(r, logger, rOpts.webOptions...)
461+
if r.ftdc != nil {
462+
r.ftdc.Add("web", r.webSvc.RequestCounter())
463+
}
461464
r.frameSvc, err = framesystem.New(ctx, resource.Dependencies{}, logger)
462465
if err != nil {
463466
return nil, err

robot/web/web.go

+68-1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ type Service interface {
7979
ModuleAddress() string
8080

8181
Stats() any
82+
83+
RequestCounter() *RequestCounter
8284
}
8385

8486
type webService struct {
@@ -101,6 +103,8 @@ type webService struct {
101103
isRunning bool
102104
webWorkers sync.WaitGroup
103105
modWorkers sync.WaitGroup
106+
107+
requestCounter RequestCounter
104108
}
105109

106110
var internalWebServiceName = resource.NewName(
@@ -220,6 +224,7 @@ func (svc *webService) StartModule(ctx context.Context) error {
220224
// Attach the module name (as defined by the robot config) to the handler context. Can be
221225
// accessed via `grpc.GetModuleName`.
222226
unaryInterceptors = append(unaryInterceptors, grpc.ModNameUnaryServerInterceptor)
227+
unaryInterceptors = append(unaryInterceptors, svc.requestCounter.UnaryInterceptor)
223228

224229
opManager := svc.r.OperationManager()
225230
unaryInterceptors = append(unaryInterceptors,
@@ -498,6 +503,68 @@ func (svc *webService) runWeb(ctx context.Context, options weboptions.Options) (
498503
return err
499504
}
500505

506+
// Namer is used to get a resource name from incoming requests for countingfor request. Requests for
507+
// resources are expected to be a gRPC object that includes a `GetName` method.
508+
type Namer interface {
509+
GetName() string
510+
}
511+
512+
// RequestCounter maps string keys to atomic ints that get bumped on every incoming gRPC request for
513+
// components.
514+
type RequestCounter struct {
515+
counts sync.Map
516+
}
517+
518+
// UnaryInterceptor returns an incoming server interceptor that will pull method information and
519+
// optionally resource information to bump the request counters.
520+
func (rc *RequestCounter) UnaryInterceptor(
521+
ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler,
522+
) (resp any, err error) {
523+
// Handle `info.FullMethod` values such as:
524+
// - `/viam.component.motor.v1.MotorService/IsMoving`
525+
// - `/viam.robot.v1.RobotService/SendSessionHeartbeat`
526+
//
527+
// Only count component APIs, for now.
528+
var apiMethod string
529+
switch {
530+
case strings.HasPrefix(info.FullMethod, "/viam.component."):
531+
apiMethod = info.FullMethod[strings.LastIndexByte(info.FullMethod, byte('/'))+1:]
532+
default:
533+
}
534+
535+
// Storing in FTDC: `web.motor-name.IsMoving: <count>`.
536+
if apiMethod != "" {
537+
if namer, ok := req.(Namer); ok {
538+
key := fmt.Sprintf("%v.%v", namer.GetName(), apiMethod)
539+
if apiCounts, ok := rc.counts.Load(key); ok {
540+
apiCounts.(*atomic.Int64).Add(1)
541+
} else {
542+
newCounter := new(atomic.Int64)
543+
newCounter.Add(1)
544+
rc.counts.Store(key, newCounter)
545+
}
546+
}
547+
}
548+
549+
return handler(ctx, req)
550+
}
551+
552+
// Stats satisfies the ftdc.Statser interface and will return a copy of the counters.
553+
func (rc *RequestCounter) Stats() any {
554+
ret := make(map[string]int64)
555+
rc.counts.Range(func(key, value any) bool {
556+
ret[key.(string)] = value.(*atomic.Int64).Load()
557+
return true
558+
})
559+
560+
return ret
561+
}
562+
563+
// RequestCounter returns the request counter object.
564+
func (svc *webService) RequestCounter() *RequestCounter {
565+
return &svc.requestCounter
566+
}
567+
501568
// Initialize RPC Server options.
502569
func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options weboptions.Options) ([]rpc.ServerOption, error) {
503570
hosts := options.GetHosts(listenerTCPAddr)
@@ -530,8 +597,8 @@ func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options webo
530597
}
531598

532599
var unaryInterceptors []googlegrpc.UnaryServerInterceptor
533-
534600
unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor)
601+
unaryInterceptors = append(unaryInterceptors, svc.requestCounter.UnaryInterceptor)
535602

536603
if options.Debug {
537604
rpcOpts = append(rpcOpts, rpc.WithDebug())

0 commit comments

Comments
 (0)