Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster estimation integration test #251

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
142 changes: 142 additions & 0 deletions tests/integration/test_cluster_estimation_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
Test cluster_estimation_worker process.
"""

import time
import multiprocessing as mp
from typing import List
import numpy as np
from utilities.workers import queue_proxy_wrapper, worker_controller
from modules.detection_in_world import DetectionInWorld
from modules.cluster_estimation.cluster_estimation_worker import cluster_estimation_worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



def test_cluster_estimation_worker() -> int:
"""
Integration test for cluster estimation worker.
"""

# Worker and controller setup.
controller = worker_controller.WorkerController()

mp_manager = mp.Manager()
input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)

worker_process = mp.Process(
target=cluster_estimation_worker,
args=(
3,
0,
3,
0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to store numerical constants like these at the top (ie RANDOM_STATE = 0, see other integration tests as an example).

input_queue,
output_queue,
controller,
),
)

# Second test set: 1 clusters
test_data_1 = [
# Landing pad 1
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
]

# First test set: 2 clusters
test_data_2 = [
# Landing pad 1
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[1, 1], [1, 2], [2, 2], [2, 1]]), np.array([1.5, 1.5]), 1, 0.9
)[1],
# Landing pad 2
DetectionInWorld.create(
np.array([[10, 10], [10, 11], [11, 11], [11, 10]]), np.array([10.5, 10.5]), 1, 0.9
)[1],
DetectionInWorld.create(
np.array([[10.1, 10.1], [10.1, 11.1], [11.1, 11.1], [11.1, 10.1]]),
np.array([10.6, 10.6]),
1,
0.92,
)[1],
DetectionInWorld.create(
np.array([[9.9, 9.9], [9.9, 10.9], [10.9, 10.9], [10.9, 9.9]]),
np.array([10.4, 10.4]),
1,
0.88,
)[1],
DetectionInWorld.create(
np.array([[10.2, 10.2], [10.2, 11.2], [11.2, 11.2], [11.2, 10.2]]),
np.array([10.7, 10.7]),
1,
0.95,
)[1],
DetectionInWorld.create(
np.array([[10.3, 10.3], [10.3, 11.3], [11.3, 11.3], [11.3, 10.3]]),
np.array([10.8, 10.8]),
1,
0.93,
)[1],
]

# Testing with test_data_1 (1 cluster)

input_queue.queue.put(test_data_1)
worker_process.start()
time.sleep(1)

output_results: List[List[DetectionInWorld]] = output_queue.queue.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to check to see if the output result is a list of ObjectInWorld, otherwise the worker wouldnt be doing anything


assert output_results is not None
assert len(output_results) == 1

time.sleep(1)

# Testing with test_data_2 (2 clusters)

input_queue.queue.put(test_data_2)
time.sleep(1)

output_results: List[List[DetectionInWorld]] = output_queue.queue.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here


assert output_results is not None
assert len(output_results) == 2

controller.request_exit()
input_queue.queue.put(None)
worker_process.join()

return 0


if __name__ == "__main__":
result_main = test_cluster_estimation_worker()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")

print("Done!")