Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tests for #130. #132

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 145 additions & 2 deletions spot_wrapper/tests/test_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Copyright (c) 2023 Boston Dynamics AI Institute LLC. See LICENSE file for more info.

import logging
from typing import Iterator
import time
from typing import Iterator, List

import grpc
import pytest
from bosdyn.api.basic_command_pb2 import RobotCommandFeedbackStatus, SitCommand, StandCommand
from bosdyn.api.manipulation_api_pb2 import (
ManipulationApiRequest,
ManipulationApiResponse,
Expand All @@ -14,7 +16,7 @@
PowerCommandResponse,
PowerCommandStatus,
)
from bosdyn.api.robot_command_pb2 import RobotCommandResponse
from bosdyn.api.robot_command_pb2 import RobotCommandFeedbackResponse, RobotCommandResponse
from bosdyn.api.robot_state_pb2 import PowerState

import spot_wrapper.testing
Expand Down Expand Up @@ -120,3 +122,144 @@ def test_spot_wrapper_commands(simple_spot: SpotFixture, simple_spot_wrapper: Sp
assert not ok
ok, _ = simple_spot_wrapper.sit()
assert not ok


def test_mobility_command_feedback(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
non_processing_feedback_statuses: List[RobotCommandFeedbackStatus] = [
RobotCommandFeedbackStatus.STATUS_UNKNOWN,
RobotCommandFeedbackStatus.STATUS_COMMAND_OVERRIDDEN,
RobotCommandFeedbackStatus.STATUS_COMMAND_TIMED_OUT,
RobotCommandFeedbackStatus.STATUS_ROBOT_FROZEN,
RobotCommandFeedbackStatus.STATUS_INCOMPATIBLE_HARDWARE,
]

# bosdyn.api.RobotCommandService/RobotCommand implementation is mocked
# (as spot_wrapper.testing.mocks.MockSpot is automatically specified),
# so here we provide the same response for every future command.
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response).forever()

def update_with_feedback_response(r: RobotCommandFeedbackResponse) -> None:
simple_spot.api.RobotCommandFeedback.future.returns(r)
simple_spot_wrapper.updateTasks()
time.sleep(0.1)

def assert_non_processing_stand_status(should_stand: bool) -> None:
for status in non_processing_feedback_statuses:
simple_spot_wrapper.stand()
assert simple_spot_wrapper.last_stand_command is not None
assert simple_spot_wrapper.is_standing == should_stand
r = RobotCommandFeedbackResponse()
r.feedback.synchronized_feedback.mobility_command_feedback.status = status
update_with_feedback_response(r)
assert simple_spot_wrapper.last_stand_command is None
assert simple_spot_wrapper.is_standing == should_stand

def assert_non_processing_sit_status(should_sit: bool) -> None:
for status in non_processing_feedback_statuses:
simple_spot_wrapper.sit()
assert simple_spot_wrapper.last_sit_command is not None
assert simple_spot_wrapper.is_sitting == should_sit
r = RobotCommandFeedbackResponse()
r.feedback.synchronized_feedback.mobility_command_feedback.status = status
update_with_feedback_response(r)
assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.is_sitting == should_sit

# initial standing state
assert simple_spot_wrapper.last_stand_command is None
assert not simple_spot_wrapper.is_standing

# stand command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_stand_status(simple_spot_wrapper.is_standing)

# stand command with processing status
simple_spot_wrapper.stand()
assert simple_spot_wrapper.last_stand_command is not None
assert not simple_spot_wrapper.is_standing
response = RobotCommandFeedbackResponse()
response.feedback.synchronized_feedback.mobility_command_feedback.status = (
RobotCommandFeedbackStatus.STATUS_PROCESSING
)
# command in progress
response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status = (
StandCommand.Feedback.STATUS_IN_PROGRESS
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_stand_command is not None
assert not simple_spot_wrapper.is_standing
assert not simple_spot_wrapper.is_sitting
# command finished
response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status = (
StandCommand.Feedback.STATUS_IS_STANDING
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_stand_command is None
assert simple_spot_wrapper.is_standing
assert not simple_spot_wrapper.is_sitting

# stand command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_stand_status(simple_spot_wrapper.is_standing)

# initial sitting state
assert simple_spot_wrapper.last_sit_command is None
assert not simple_spot_wrapper.is_sitting

# sit command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_sit_status(simple_spot_wrapper.is_sitting)

# sit command with processing status
simple_spot_wrapper.sit()
assert simple_spot_wrapper.last_sit_command is not None
assert not simple_spot_wrapper.is_sitting
response = RobotCommandFeedbackResponse()
response.feedback.synchronized_feedback.mobility_command_feedback.status = (
RobotCommandFeedbackStatus.STATUS_PROCESSING
)
# command in progress
response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status = (
SitCommand.Feedback.STATUS_IN_PROGRESS
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_sit_command is not None
assert not simple_spot_wrapper.is_sitting
assert not simple_spot_wrapper.is_standing
# command finished
response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status = (
SitCommand.Feedback.STATUS_IS_SITTING
)
update_with_feedback_response(response)
assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.is_sitting
assert not simple_spot_wrapper.is_standing

# sit command with non-processing status - should reset the command and leave state unchanged
assert_non_processing_sit_status(simple_spot_wrapper.is_sitting)


def test_robot_movement_states(simple_spot: SpotFixture, simple_spot_wrapper: SpotWrapper) -> None:
# bosdyn.api.RobotCommandService/RobotCommand implementation is mocked
# (as spot_wrapper.testing.mocks.MockSpot is automatically specified),
# so here we provide the same response for every future command.
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
simple_spot.api.RobotCommand.future.returns(response).forever()

assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.last_stand_command is None
assert simple_spot_wrapper.last_velocity_command_time is None
assert simple_spot_wrapper.is_sitting
assert not simple_spot_wrapper.is_standing
assert not simple_spot_wrapper.is_moving

simple_spot_wrapper.velocity_cmd(0, 0, 0)
assert simple_spot_wrapper.last_velocity_command_time is not None
simple_spot_wrapper.updateTasks()
time.sleep(0.1)

assert simple_spot_wrapper.last_sit_command is None
assert simple_spot_wrapper.last_stand_command is None
assert not simple_spot_wrapper.is_sitting
assert simple_spot_wrapper.is_standing
assert simple_spot_wrapper.is_moving
khughes-bdai marked this conversation as resolved.
Show resolved Hide resolved
Loading