From 95653927591582f7b4e68e3cc7241af6165ce1f7 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Thu, 6 Feb 2025 13:06:48 -0300 Subject: [PATCH] Allow joint state and command streams' mocking Signed-off-by: Michel Hidalgo --- spot_wrapper/testing/grpc.py | 14 +++++++------- spot_wrapper/testing/helpers.py | 23 +++++++++++++++++++++++ spot_wrapper/testing/mocks/__init__.py | 16 +++++++++++----- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/spot_wrapper/testing/grpc.py b/spot_wrapper/testing/grpc.py index 1c62dc9..76ba972 100644 --- a/spot_wrapper/testing/grpc.py +++ b/spot_wrapper/testing/grpc.py @@ -14,7 +14,7 @@ import grpc from bosdyn.api.header_pb2 import CommonError -from spot_wrapper.testing.helpers import ForwardingWrapper +from spot_wrapper.testing.helpers import ForwardingWrapper, cache1 def implemented(function: typing.Callable) -> bool: @@ -273,9 +273,9 @@ class AutoCompletingStreamUnaryRpcHandler(ForwardingWrapper): """ def __call__(self, request_iterator: typing.Iterator, context: grpc.ServicerContext) -> typing.Any: - *head_requests, tail_request = request_iterator - response = self.__wrapped__(iter([*head_requests, tail_request]), context) - fill_response_header(tail_request, response) + cached_request_iterator = cache1(request_iterator) + response = self.__wrapped__(cached_request_iterator, context) + fill_response_header(cached_request_iterator.cache, response) return response @@ -295,9 +295,9 @@ class AutoCompletingStreamStreamRpcHandler(ForwardingWrapper): """ def __call__(self, request_iterator: typing.Iterator, context: grpc.ServicerContext) -> typing.Iterator: - *head_requests, tail_request = request_iterator - for response in self.__wrapped__(iter([*head_requests, tail_request]), context): - fill_response_header(tail_request, response) + cached_request_iterator = cache1(request_iterator) + for response in self.__wrapped__(cached_request_iterator, context): + fill_response_header(cached_request_iterator.cache, response) yield response diff --git a/spot_wrapper/testing/helpers.py b/spot_wrapper/testing/helpers.py index b41db22..f631ef9 100644 --- a/spot_wrapper/testing/helpers.py +++ b/spot_wrapper/testing/helpers.py @@ -39,3 +39,26 @@ def walk_resource_tree(resource_tree: ResourceTree) -> typing.Iterable[ResourceT yield resource_tree for subtree in resource_tree.sub_resources: yield from walk_resource_tree(subtree) + + +_T = typing.TypeVar("_T") + + +class cache1(typing.Iterator[_T]): + """Iterator wrapper that caches the last item retrieved.""" + + def __init__(self, inner: typing.Iterator[_T]): + """Initialize cached iterator + + Args: + inner: inner iterator to cache + """ + self.__inner = inner + self.cache: typing.Optional[_T] = None + + def __iter__(self) -> "cache1": + return self + + def __next__(self) -> _T: + self.cache = next(self.__inner) + return self.cache diff --git a/spot_wrapper/testing/mocks/__init__.py b/spot_wrapper/testing/mocks/__init__.py index 58ecf0f..3d1b1ac 100644 --- a/spot_wrapper/testing/mocks/__init__.py +++ b/spot_wrapper/testing/mocks/__init__.py @@ -59,14 +59,18 @@ from bosdyn.api.power_service_pb2_grpc import ( PowerServiceServicer as PowerCommandServiceServicer, ) -from bosdyn.api.robot_command_service_pb2_grpc import RobotCommandServiceServicer +from bosdyn.api.robot_command_service_pb2_grpc import ( + RobotCommandServiceServicer, + RobotCommandStreamingServiceServicer, +) from bosdyn.api.robot_id_service_pb2_grpc import RobotIdServiceServicer -from bosdyn.api.robot_state_service_pb2_grpc import RobotStateServiceServicer +from bosdyn.api.robot_state_service_pb2_grpc import ( + RobotStateServiceServicer, + RobotStateStreamingServiceServicer, +) from bosdyn.api.spot.choreography_service_pb2_grpc import ChoreographyServiceServicer from bosdyn.api.spot.door_service_pb2_grpc import DoorServiceServicer -from bosdyn.api.spot.inverse_kinematics_service_pb2_grpc import ( - InverseKinematicsServiceServicer, -) +from bosdyn.api.spot.inverse_kinematics_service_pb2_grpc import InverseKinematicsServiceServicer from bosdyn.api.spot.spot_check_service_pb2_grpc import SpotCheckServiceServicer from bosdyn.api.spot_cam.service_pb2_grpc import ( AudioServiceServicer, @@ -145,8 +149,10 @@ class BaseMockSpot( PtzServiceServicer, RemoteMissionServiceServicer, RobotCommandServiceServicer, + RobotCommandStreamingServiceServicer, RobotIdServiceServicer, RobotStateServiceServicer, + RobotStateStreamingServiceServicer, SpotCheckServiceServicer, StreamQualityServiceServicer, TimeSyncServiceServicer,