-
Notifications
You must be signed in to change notification settings - Fork 715
/
Copy path__init__.py
89 lines (71 loc) · 2.64 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
"""
Top-level module of the BehaviorTree.CPP Python bindings.
"""
# re-export
from btpy_cpp import (
BehaviorTreeFactory,
NodeStatus,
StatefulActionNode,
SyncActionNode,
Tree,
)
def ports(inputs: list[str] = [], outputs: list[str] = []):
"""Decorator to specify input and outputs ports for an action node."""
def specify_ports(cls):
cls.input_ports = list(inputs)
cls.output_ports = list(outputs)
return cls
return specify_ports
class AsyncActionNode(StatefulActionNode):
"""An abstract action node implemented via cooperative multitasking.
Subclasses must implement the `run()` method as a generator. Optionally,
this method can return a final `NodeStatus` value to indicate its exit
condition.
Optionally, subclasses can override the `on_halted()` method which is called
when the tree halts. The default implementation does nothing. The `run()`
method will never be called again after a halt.
Note:
It is the responsibility of the action author to not block the main
behavior tree loop with long-running tasks. `yield` calls should be
placed whenever a pause is appropriate.
"""
def __init__(self, name, config):
super().__init__(name, config)
def on_start(self) -> NodeStatus:
self.coroutine = self.run()
return NodeStatus.RUNNING
def on_running(self) -> NodeStatus:
# The library logic should never allow this to happen, but users can
# still manually call `on_running` without an associated `on_start`
# call. Make sure to print a useful error when this happens.
if self.coroutine is None:
raise "AsyncActionNode run without starting"
# Resume the coroutine (generator). As long as the generator is not
# exhausted, keep this action in the RUNNING state.
try:
next(self.coroutine)
return NodeStatus.RUNNING
except StopIteration as e:
# If the action returns a status then propagate it upwards.
if e.value is not None:
return e.value
# Otherwise, just assume the action finished successfully.
else:
return NodeStatus.SUCCESS
def on_halted(self):
# Default action: do nothing
pass
# Specify the symbols to be imported with `from btpy import *`, as described in
# [1].
#
# [1]: https://docs.python.org/3/tutorial/modules.html#importing-from-a-package
__all__ = [
"ports",
"AsyncActionNode",
"BehaviorTreeFactory",
"NodeStatus",
"StatefulActionNode",
"SyncActionNode",
"Tree",
]