Skip to content

Reduce preempt stop time in stages #598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/include/moveit/task_constructor/stage_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
namespace moveit {
namespace task_constructor {

/// exception thrown by StagePrivate::runCompute()
class PreemptStageException : public std::exception
{
public:
explicit PreemptStageException() {}
const char* what() const noexcept override {
static const char* msg = "";
return msg;
}
};

class ContainerBase;
class StagePrivate
{
Expand Down Expand Up @@ -146,6 +157,10 @@ class StagePrivate
bool storeFailures() const { return introspection_ != nullptr; }
void runCompute() {
ROS_DEBUG_STREAM_NAMED("Stage", fmt::format("Computing stage '{}'", name()));

if (preempted())
throw PreemptStageException();

auto compute_start_time = std::chrono::steady_clock::now();
try {
compute();
Expand All @@ -159,6 +174,10 @@ class StagePrivate
/** compute cost for solution through configured CostTerm */
void computeCost(const InterfaceState& from, const InterfaceState& to, SolutionBase& solution);

void setPreemptedCheck(const std::atomic<bool>* preempt_requested);
/// is the stage preempted ? defaults to false
bool preempted() const;

protected:
StagePrivate& operator=(StagePrivate&& other);

Expand Down Expand Up @@ -197,6 +216,8 @@ class StagePrivate
InterfaceWeakPtr next_starts_; // interface to be used for sendForward()

Introspection* introspection_; // task's introspection instance

const std::atomic<bool>* preempt_requested_;
};
PIMPL_FUNCTIONS(Stage)
std::ostream& operator<<(std::ostream& os, const StagePrivate& stage);
Expand Down
1 change: 1 addition & 0 deletions core/include/moveit/task_constructor/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class Task : protected WrapperBase
/// interrupt current planning
void preempt();
void resetPreemptRequest();
bool isPreempted();
/// execute solution, return the result
moveit::core::MoveItErrorCode execute(const SolutionBase& s);

Expand Down
14 changes: 13 additions & 1 deletion core/src/stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ StagePrivate::StagePrivate(Stage* me, const std::string& name)
, cost_term_{ std::make_unique<CostTerm>() }
, total_compute_time_{}
, parent_{ nullptr }
, introspection_{ nullptr } {}
, introspection_{ nullptr }
, preempt_requested_{ nullptr } {}

StagePrivate& StagePrivate::operator=(StagePrivate&& other) {
assert(typeid(*this) == typeid(other));
Expand Down Expand Up @@ -305,6 +306,17 @@ void StagePrivate::computeCost(const InterfaceState& from, const InterfaceState&
}
}

void StagePrivate::setPreemptedCheck(const std::atomic<bool>* preempt_requested) {
preempt_requested_ = preempt_requested;
}

bool StagePrivate::preempted() const {
if (preempt_requested_)
return *preempt_requested_;

return false;
}

Stage::Stage(StagePrivate* impl) : pimpl_(impl) {
assert(impl);
auto& p = properties();
Expand Down
15 changes: 12 additions & 3 deletions core/src/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,12 @@ void Task::init() {
// task expects its wrapped child to push to both ends, this triggers interface resolution
stages()->pimpl()->resolveInterface(InterfaceFlags({ GENERATE }));

// provide introspection instance to all stages
// provide introspection instance and prempted requested to all stages
auto* introspection = impl->introspection_.get();
impl->traverseStages(
[introspection](Stage& stage, int /*depth*/) {
[introspection, impl](Stage& stage, int /*depth*/) {
stage.pimpl()->setIntrospection(introspection);
stage.pimpl()->setPreemptedCheck(&impl->preempt_requested_);
return true;
},
1, UINT_MAX);
Expand All @@ -232,7 +233,11 @@ bool Task::canCompute() const {
}

void Task::compute() {
stages()->pimpl()->runCompute();
try {
stages()->pimpl()->runCompute();
} catch (const PreemptStageException& e) {
// do nothing, needed for early stop
}
}

moveit::core::MoveItErrorCode Task::plan(size_t max_solutions) {
Expand Down Expand Up @@ -274,6 +279,10 @@ void Task::resetPreemptRequest() {
pimpl()->preempt_requested_ = false;
}

bool Task::isPreempted() {
return pimpl()->preempt_requested_;
}

moveit::core::MoveItErrorCode Task::execute(const SolutionBase& s) {
actionlib::SimpleActionClient<moveit_task_constructor_msgs::ExecuteTaskSolutionAction> ac("execute_task_solution");
if (!ac.waitForServer(ros::Duration(0.5))) {
Expand Down
4 changes: 4 additions & 0 deletions core/test/stage_mockups.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct GeneratorMockup : public Generator
void init(const moveit::core::RobotModelConstPtr& robot_model) override;
bool canCompute() const override;
void compute() override;
virtual void reset() override { runs_ = 0; };
};

struct MonitoringGeneratorMockup : public MonitoringGenerator
Expand All @@ -81,6 +82,7 @@ struct MonitoringGeneratorMockup : public MonitoringGenerator
bool canCompute() const override { return false; }
void compute() override {}
void onNewSolution(const SolutionBase& s) override;
virtual void reset() override { runs_ = 0; };
};

struct ConnectMockup : public Connecting
Expand All @@ -97,6 +99,7 @@ struct ConnectMockup : public Connecting
using Connecting::compatible; // make this accessible for testing

void compute(const InterfaceState& from, const InterfaceState& to) override;
virtual void reset() override { runs_ = 0; };
};

struct PropagatorMockup : public PropagatingEitherWay
Expand All @@ -113,6 +116,7 @@ struct PropagatorMockup : public PropagatingEitherWay

void computeForward(const InterfaceState& from) override;
void computeBackward(const InterfaceState& to) override;
virtual void reset() override { runs_ = 0; };
};

struct ForwardMockup : public PropagatorMockup
Expand Down
38 changes: 31 additions & 7 deletions core/test/test_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,21 +674,20 @@ TEST(Task, timeout) {
}

// https://github.com/moveit/moveit_task_constructor/pull/597
// https://github.com/moveit/moveit_task_constructor/pull/598
// start planning in another thread, then preempt it in this thread
TEST(Task, preempt) {
TEST_F(TaskTestBase, preempt) {
moveit::core::MoveItErrorCode ec;
resetMockupIds();

Task t;
t.setRobotModel(getModel());

auto timeout = std::chrono::milliseconds(10);
t.add(std::make_unique<GeneratorMockup>(PredefinedCosts::constant(0.0)));
t.add(std::make_unique<TimedForwardMockup>(timeout));
auto gen1 = add(t, new GeneratorMockup(PredefinedCosts::constant(0.0)));
auto fwd1 = add(t, new TimedForwardMockup(timeout));
auto fwd2 = add(t, new TimedForwardMockup(timeout));

// preempt before preempt_request_ is reset in plan()
{
std::thread thread{ [&ec, &t, timeout] {
std::thread thread{ [&ec, this, timeout] {
std::this_thread::sleep_for(timeout);
ec = t.plan(1);
} };
Expand All @@ -698,5 +697,30 @@ TEST(Task, preempt) {

EXPECT_EQ(ec, moveit::core::MoveItErrorCode::PREEMPTED);
EXPECT_EQ(t.solutions().size(), 0u);
EXPECT_EQ(gen1->runs_, 0u);
EXPECT_EQ(fwd1->runs_, 0u);
EXPECT_EQ(fwd2->runs_, 0u);
EXPECT_TRUE(t.plan(1)); // make sure the preempt request has been resetted on the previous call to plan()

// prempt in between stage computation
// Failing stage(s):
// 0 - ← 0 → - 0 / task pipeline
// 1 - ← 1 → - 0 / GEN1
// - 0 → 1 → - 1 / FWD1
// - 1 → 0 → - 0 / FWD2
t.reset();
resetMockupIds();
{
std::thread thread{ [&ec, this] { ec = t.plan(1); } };
std::this_thread::sleep_for(timeout);
t.preempt();
thread.join();
}

EXPECT_EQ(ec, moveit::core::MoveItErrorCode::PREEMPTED);
EXPECT_EQ(t.solutions().size(), 0u);
EXPECT_EQ(gen1->runs_, 1u);
EXPECT_EQ(fwd1->runs_, 1u);
EXPECT_EQ(fwd2->runs_, 0u);
EXPECT_TRUE(t.plan(1)); // make sure the preempt request has been resetted on the previous call to plan()
}
Loading