Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic Spot CAM mocks #119

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions spot_wrapper/testing/mocks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@
from spot_wrapper.testing.grpc import AutoServicer, collect_method_handlers
from spot_wrapper.testing.helpers import enforce_matching_headers
from spot_wrapper.testing.mocks.auth import MockAuthService
from spot_wrapper.testing.mocks.cam import MockCAMService
from spot_wrapper.testing.mocks.directory import MockDirectoryService
from spot_wrapper.testing.mocks.estop import MockEStopService
from spot_wrapper.testing.mocks.keepalive import MockKeepaliveService
from spot_wrapper.testing.mocks.lease import MockLeaseService
from spot_wrapper.testing.mocks.license import MockLicenseService
from spot_wrapper.testing.mocks.payload_registration import (
MockPayloadRegistrationService,
)
from spot_wrapper.testing.mocks.payload import MockPayloadService
from spot_wrapper.testing.mocks.power import MockPowerService
from spot_wrapper.testing.mocks.robot_id import MockRobotIdService
from spot_wrapper.testing.mocks.robot_state import MockRobotStateService
Expand Down Expand Up @@ -180,12 +179,13 @@ def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
class MockSpot(
BaseMockSpot,
MockAuthService,
MockCAMService,
MockDirectoryService,
MockEStopService,
MockKeepaliveService,
MockLeaseService,
MockLicenseService,
MockPayloadRegistrationService,
MockPayloadService,
MockPowerService,
MockRobotIdService,
MockRobotStateService,
Expand Down
20 changes: 20 additions & 0 deletions spot_wrapper/testing/mocks/cam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) 2024 Boston Dynamics AI Institute LLC. See LICENSE file for more info.

import typing

import grpc
from bosdyn.api.spot_cam.ptz_pb2 import ListPtzRequest, ListPtzResponse, PtzDescription
from bosdyn.api.spot_cam.service_pb2_grpc import PtzServiceServicer


class MockCAMService(PtzServiceServicer):
"""Mock Spot CAM services."""

def __init__(self, **kwargs: typing.Any) -> None:
super().__init__(**kwargs)
self.ptzs: typing.List[PtzDescription] = []

def ListPtz(self, request: ListPtzRequest, context: grpc.ServicerContext) -> ListPtzResponse:
response = ListPtzResponse()
response.ptzs.extend(self.ptzs)
return response
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import typing

import grpc
from bosdyn.api.payload_pb2 import Payload
from bosdyn.api.payload_pb2 import ListPayloadsRequest, ListPayloadsResponse, Payload
from bosdyn.api.payload_registration_pb2 import (
GetPayloadAuthTokenRequest,
GetPayloadAuthTokenResponse,
Expand All @@ -17,22 +17,26 @@
from bosdyn.api.payload_registration_service_pb2_grpc import (
PayloadRegistrationServiceServicer,
)
from bosdyn.api.payload_service_pb2_grpc import PayloadServiceServicer


class MockPayloadRegistrationService(PayloadRegistrationServiceServicer):
class MockPayloadService(PayloadRegistrationServiceServicer, PayloadServiceServicer):
"""
A mock Spot payload registration service.
A mock Spot payload registration and listing service.

It bookkeeps all payloads but enforces nothing.
"""

def __init__(self, **kwargs: typing.Any) -> None:
super().__init__(**kwargs)
self._payloads: typing.Dict[str, Payload] = {}
self.payloads: typing.Dict[str, Payload] = {
"spotcam": Payload(GUID="spotcam", name="Spot CAM", is_enabled=True)
}

@property
def payloads(self) -> typing.Iterable[Payload]:
return list(self._payloads.values())
def ListPayloads(self, request: ListPayloadsRequest, context: grpc.ServicerContext) -> ListPayloadsResponse:
response = ListPayloadsResponse()
response.payloads.extend(self.payloads.values())
return response

def RegisterPayload(
self, request: RegisterPayloadRequest, context: grpc.ServicerContext
Expand Down