diff --git a/config.yaml b/config.yaml index 9c1927c..a5ce08b 100644 --- a/config.yaml +++ b/config.yaml @@ -16,7 +16,11 @@ detection: high_angle: 170 # degrees rotate_speed: 5 +clustering: + max_cluster_distance: 0.2 # metres + data_merge: + merge_data_type: "OBSTACLES" # "OBSTACLES" or "DETECTIONS" delay: 0.1 # seconds decision: diff --git a/main.py b/main.py index e51bfad..1973d96 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ import yaml +from modules.clustering import clustering_worker from modules.data_merge import data_merge_worker from modules.decision import decision_worker from modules.detection import detection_worker @@ -55,6 +56,9 @@ def main() -> int: HIGH_ANGLE = config["detection"]["high_angle"] ROTATE_SPEED = config["detection"]["rotate_speed"] + MAX_CLUSTER_DISTANCE = config["clustering"]["max_cluster_distance"] + + MERGE_DATA_TYPE = config["data_merge"]["merge_data_type"] DELAY = config["data_merge"]["delay"] OBJECT_PROXIMITY_LIMIT = config["decision"]["object_proximity_limit"] @@ -71,6 +75,11 @@ def main() -> int: flight_interface_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) detection_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) + detection_to_clustering_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) + clustering_to_cluster_classification_queue = queue_wrapper.QueueWrapper( + mp_manager, QUEUE_MAX_SIZE + ) + obstacle_to_data_merge_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) merged_to_decision_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) command_to_flight_interface_queue = queue_wrapper.QueueWrapper(mp_manager, QUEUE_MAX_SIZE) @@ -97,15 +106,30 @@ def main() -> int: HIGH_ANGLE, ROTATE_SPEED, detection_to_data_merge_queue, + detection_to_clustering_queue, + controller, + ), + ) + + clustering_process = mp.Process( + target=clustering_worker.clustering_worker, + args=( + MAX_CLUSTER_DISTANCE, + detection_to_clustering_queue, + clustering_to_cluster_classification_queue, controller, ), ) + # cluster_classification_process will go here. + data_merge_process = mp.Process( target=data_merge_worker.data_merge_worker, args=( + MERGE_DATA_TYPE, DELAY, detection_to_data_merge_queue, + obstacle_to_data_merge_queue, flight_interface_to_data_merge_queue, merged_to_decision_queue, controller, @@ -127,6 +151,7 @@ def main() -> int: # Run flight_interface_process.start() detection_process.start() + clustering_process.start() data_merge_process.start() decision_process.start() @@ -141,11 +166,14 @@ def main() -> int: # Teardown flight_interface_to_data_merge_queue.fill_and_drain_queue() detection_to_data_merge_queue.fill_and_drain_queue() + detection_to_clustering_queue.fill_and_drain_queue() + clustering_to_cluster_classification_queue.fill_and_drain_queue() merged_to_decision_queue.fill_and_drain_queue() command_to_flight_interface_queue.fill_and_drain_queue() flight_interface_process.join() detection_process.join() + clustering_process.join() data_merge_process.join() decision_process.join() diff --git a/modules/data_merge/data_merge_worker.py b/modules/data_merge/data_merge_worker.py index 00e1990..b251738 100644 --- a/modules/data_merge/data_merge_worker.py +++ b/modules/data_merge/data_merge_worker.py @@ -2,20 +2,34 @@ Merges local drone odometry with LiDAR detections """ +import enum import queue import time -from worker import queue_wrapper -from worker import worker_controller - from modules import detections_and_odometry from modules import drone_odometry_local from modules import lidar_detection +from modules import obstacle +from modules import obstacles_and_odometry + +from worker import queue_wrapper +from worker import worker_controller + + +class MergeDataType(enum.Enum): + """ + Types of data to merge with odometry. + """ + + OBSTACLES = 0 + DETECTIONS = 0 def data_merge_worker( delay: float, + merge_data_type: str, detection_input_queue: queue_wrapper.QueueWrapper, + obstacle_input_queue: queue_wrapper.QueueWrapper, odometry_input_queue: queue_wrapper.QueueWrapper, output_queue: queue_wrapper.QueueWrapper, controller: worker_controller.WorkerController, @@ -27,28 +41,64 @@ def data_merge_worker( detection_input_queue, odometry_input_queue, output_queue are data queues. controller is how the main process communicates to this worker process. """ + if merge_data_type == "OBSTACLES": + merge_data_type = MergeDataType.OBSTACLES + elif merge_data_type == "DETECTIONS": + merge_data_type = MergeDataType.DETECTIONS + detections = [] + obstacles = [] while not controller.is_exit_requested(): controller.check_pause() - try: - detection: lidar_detection.LidarDetection = detection_input_queue.queue.get_nowait() - detections.append(detection) - except queue.Empty: - time.sleep(delay) + if merge_data_type == MergeDataType.OBSTACLES: + try: + new_obstacle: obstacle.Obstacle = obstacle_input_queue.queue.get_nowait() + obstacles.append(new_obstacle) + except queue.Empty: + if len(obstacles) == 0: + continue + time.sleep(delay) - try: - odometry: drone_odometry_local.DroneOdometryLocal = ( - odometry_input_queue.queue.get_nowait() + try: + odometry: drone_odometry_local.DroneOdometryLocal = ( + odometry_input_queue.queue.get_nowait() + ) + except queue.Empty: + continue + + result, merged = obstacles_and_odometry.ObstaclesAndOdometry.create( + detections, odometry ) + if not result: + continue + obstacles = [] + + elif merge_data_type == MergeDataType.DETECTIONS: + try: + detection: lidar_detection.LidarDetection = detection_input_queue.queue.get_nowait() + detections.append(detection) + except queue.Empty: + if len(detections) == 0: + continue + time.sleep(delay) - except queue.Empty: - continue + try: + odometry: drone_odometry_local.DroneOdometryLocal = ( + odometry_input_queue.queue.get_nowait() + ) - result, merged = detections_and_odometry.DetectionsAndOdometry.create(detections, odometry) + except queue.Empty: + continue - if not result: - continue + result, merged = detections_and_odometry.DetectionsAndOdometry.create( + detections, odometry + ) + if not result: + continue + detections = [] + else: + # log error + return - detections = [] output_queue.queue.put(merged) diff --git a/modules/decision/decision.py b/modules/decision/decision.py index 3d87b81..46bca07 100644 --- a/modules/decision/decision.py +++ b/modules/decision/decision.py @@ -8,6 +8,7 @@ from .. import decision_command from .. import detections_and_odometry from .. import drone_odometry_local +from .. import obstacles_and_odometry class Decision: @@ -20,14 +21,14 @@ def __init__(self, proximity_limit: float, max_history: int, command_timeout: fl Initialize current drone state and its lidar detections list. """ self.proximity_limit = proximity_limit - self.detections_and_odometries = deque(maxlen=max_history) + self.merged_odometries = deque(maxlen=max_history) self.command_timeout = command_timeout self.__command_requested = False self.__last_command_sent = None def run_simple_decision( self, - detections_and_odometries: "deque[detections_and_odometry.DetectionsAndOdometry]", + merged_odometries: "deque[detections_and_odometry.DetectionsAndOdometry]", proximity_limit: float, current_flight_mode: drone_odometry_local.FlightMode, ) -> "tuple[bool, decision_command.DecisionCommand | None]": @@ -35,7 +36,7 @@ def run_simple_decision( Runs simple collision avoidance where drone will stop within a set distance of an object. """ start_time = 0 - for lidar_scan_and_odometry in detections_and_odometries: + for lidar_scan_and_odometry in merged_odometries: detections = lidar_scan_and_odometry.detections if self.__command_requested and self.__last_command_sent == current_flight_mode: @@ -73,14 +74,29 @@ def run_simple_decision( ) return False, None + def run_obstacle_avoidance( + self, obstacles: "deque[obstacles_and_odometry.ObstaclesAndOdometry]" + ) -> "tuple[False, None]": + """ + Run obstacle avoidance algorithm. + """ + # TODO + print(obstacles) + return False, None + def run( - self, merged_data: detections_and_odometry.DetectionsAndOdometry + self, + merged_data: "detections_and_odometry.DetectionsAndOdometry | obstacles_and_odometry.ObstaclesAndOdometry", ) -> "tuple[bool, decision_command.DecisionCommand | None]": """ Run obstacle avoidance. """ current_flight_mode = merged_data.odometry.flight_mode - self.detections_and_odometries.append(merged_data) - return self.run_simple_decision( - self.detections_and_odometries, self.proximity_limit, current_flight_mode - ) + self.merged_odometries.append(merged_data) + if str(type(merged_data)) == "": + return self.run_simple_decision( + self.merged_odometries, self.proximity_limit, current_flight_mode + ) + if str(type(merged_data)) == "": + return self.run_obstacle_avoidance(self.merged_odometries) + return False, None diff --git a/modules/decision/decision_worker.py b/modules/decision/decision_worker.py index acaa64a..ce449c1 100644 --- a/modules/decision/decision_worker.py +++ b/modules/decision/decision_worker.py @@ -3,6 +3,7 @@ """ from modules import detections_and_odometry +from modules import obstacle from worker import queue_wrapper from worker import worker_controller from . import decision @@ -13,6 +14,7 @@ def decision_worker( max_history: int, command_timeout: float, merged_in_queue: queue_wrapper.QueueWrapper, + obstacle_in_queue: queue_wrapper.QueueWrapper, command_out_queue: queue_wrapper.QueueWrapper, controller: worker_controller.WorkerController, ) -> None: @@ -29,13 +31,23 @@ def decision_worker( while not controller.is_exit_requested(): controller.check_pause() - merged_data: detections_and_odometry.DetectionsAndOdometry = merged_in_queue.queue.get() + merged_data: detections_and_odometry.DetectionsAndOdometry = ( + merged_in_queue.queue.get_nowait() + ) if merged_data is None: break + obstacle_data: obstacle.Obstacle = obstacle_in_queue.queue.get_nowait() + if obstacle_data is None: + break + result, value = decider.run(merged_data) if not result: continue + result, value = decider.run_obstacle_avoidance(obstacle_data) + if not result: + continue + print(f"Decision: Command sent: {value.command}") command_out_queue.queue.put(value) diff --git a/modules/detection/detection_worker.py b/modules/detection/detection_worker.py index 7908578..ad10a04 100644 --- a/modules/detection/detection_worker.py +++ b/modules/detection/detection_worker.py @@ -15,7 +15,8 @@ def detection_worker( low_angle: float, high_angle: float, rotate_speed: int, - output_queue: queue_wrapper.QueueWrapper, + detection_to_clustering_queue: queue_wrapper.QueueWrapper, + detection_to_data_merge_queue: queue_wrapper.QueueWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -46,4 +47,5 @@ def detection_worker( if not result: continue - output_queue.queue.put(value) + detection_to_clustering_queue.queue.put(value) + detection_to_data_merge_queue.queue.put(value)