Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow joint state and command streams' mocking #155

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions spot_wrapper/testing/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import grpc
from bosdyn.api.header_pb2 import CommonError

from spot_wrapper.testing.helpers import ForwardingWrapper
from spot_wrapper.testing.helpers import ForwardingWrapper, cache1


def implemented(function: typing.Callable) -> bool:
Expand Down Expand Up @@ -273,9 +273,9 @@ class AutoCompletingStreamUnaryRpcHandler(ForwardingWrapper):
"""

def __call__(self, request_iterator: typing.Iterator, context: grpc.ServicerContext) -> typing.Any:
*head_requests, tail_request = request_iterator
response = self.__wrapped__(iter([*head_requests, tail_request]), context)
fill_response_header(tail_request, response)
cached_request_iterator = cache1(request_iterator)
response = self.__wrapped__(cached_request_iterator, context)
fill_response_header(cached_request_iterator.cache, response)
return response


Expand All @@ -295,9 +295,9 @@ class AutoCompletingStreamStreamRpcHandler(ForwardingWrapper):
"""

def __call__(self, request_iterator: typing.Iterator, context: grpc.ServicerContext) -> typing.Iterator:
*head_requests, tail_request = request_iterator
for response in self.__wrapped__(iter([*head_requests, tail_request]), context):
fill_response_header(tail_request, response)
cached_request_iterator = cache1(request_iterator)
for response in self.__wrapped__(cached_request_iterator, context):
fill_response_header(cached_request_iterator.cache, response)
yield response


Expand Down
23 changes: 23 additions & 0 deletions spot_wrapper/testing/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,26 @@ def walk_resource_tree(resource_tree: ResourceTree) -> typing.Iterable[ResourceT
yield resource_tree
for subtree in resource_tree.sub_resources:
yield from walk_resource_tree(subtree)


_T = typing.TypeVar("_T")


class cache1(typing.Iterator[_T]):
"""Iterator wrapper that caches the last item retrieved."""

def __init__(self, inner: typing.Iterator[_T]):
"""Initialize cached iterator

Args:
inner: inner iterator to cache
"""
self.__inner = inner
self.cache: typing.Optional[_T] = None

def __iter__(self) -> "cache1":
return self

def __next__(self) -> _T:
self.cache = next(self.__inner)
return self.cache
16 changes: 11 additions & 5 deletions spot_wrapper/testing/mocks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@
from bosdyn.api.power_service_pb2_grpc import (
PowerServiceServicer as PowerCommandServiceServicer,
)
from bosdyn.api.robot_command_service_pb2_grpc import RobotCommandServiceServicer
from bosdyn.api.robot_command_service_pb2_grpc import (
RobotCommandServiceServicer,
RobotCommandStreamingServiceServicer,
)
from bosdyn.api.robot_id_service_pb2_grpc import RobotIdServiceServicer
from bosdyn.api.robot_state_service_pb2_grpc import RobotStateServiceServicer
from bosdyn.api.robot_state_service_pb2_grpc import (
RobotStateServiceServicer,
RobotStateStreamingServiceServicer,
)
from bosdyn.api.spot.choreography_service_pb2_grpc import ChoreographyServiceServicer
from bosdyn.api.spot.door_service_pb2_grpc import DoorServiceServicer
from bosdyn.api.spot.inverse_kinematics_service_pb2_grpc import (
InverseKinematicsServiceServicer,
)
from bosdyn.api.spot.inverse_kinematics_service_pb2_grpc import InverseKinematicsServiceServicer
from bosdyn.api.spot.spot_check_service_pb2_grpc import SpotCheckServiceServicer
from bosdyn.api.spot_cam.service_pb2_grpc import (
AudioServiceServicer,
Expand Down Expand Up @@ -145,8 +149,10 @@ class BaseMockSpot(
PtzServiceServicer,
RemoteMissionServiceServicer,
RobotCommandServiceServicer,
RobotCommandStreamingServiceServicer,
RobotIdServiceServicer,
RobotStateServiceServicer,
RobotStateStreamingServiceServicer,
SpotCheckServiceServicer,
StreamQualityServiceServicer,
TimeSyncServiceServicer,
Expand Down