-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
73 lines (64 loc) · 2.44 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from multiprocessing import Manager
import queue
import threading
import asyncio
from src.logger import log
from src.config import config
from src.services.status_server import status_server
from src.services.milvus_service import milvus_service
from src.services.nats_client import nats_client
from src.services.api_server import api_server
if __name__ == '__main__':
execution_queue = queue.Queue()
startup_queue = queue.Queue()
shared_stats = {
"nats-ready": not config.NATS_ENABLED,
"nats-alive": not config.NATS_ENABLED,
"milvus-ready": False,
"milvus-alive": False
}
def shutdown_services(servie_name):
log.error(f"Service{servie_name} requested shutdown")
exit(1)
for i in range(config.MILVUS_WORKERS):
log.info(f"Starting Milvus worker #{i + 1}")
milvus_thread = threading.Thread(
target=milvus_service.start_milvus_service,
args=(shared_stats, execution_queue, startup_queue, i,),
daemon=True
)
milvus_thread.start()
log.info(f"Waiting for Milvus worker #{i + 1} to be ready")
thread_status = startup_queue.get()
if not thread_status.get("success", False):
log.error(thread_status.get("error", "Unknown error"))
exit(1)
if config.NATS_ENABLED:
# Run the NATS client asynchronously
log.info("Starting Nats client")
asyncio_thread = threading.Thread(
target=asyncio.run,
args=(nats_client.start_nats_client(shared_stats, execution_queue, startup_queue),),
daemon=True
)
asyncio_thread.start()
log.info("Waiting for nats to be ready")
thread_status = startup_queue.get()
if not thread_status.get("success", False):
log.error(thread_status.get("error", "Unknown error"))
exit(1)
if config.API_SERVER_ENABLED:
# Run the Rest API server asynchronously
log.info("Starting API server")
api_ready_event = threading.Event()
asyncio_thread = threading.Thread(
target=api_server.run_fastapi_app,
args=(execution_queue, api_ready_event,),
daemon=True
)
asyncio_thread.start()
log.info("Waiting for API server to start")
api_ready_event.wait()
# Run the Flask app in a separate thread
log.info("Starting Status server")
status_server.run_flask_app(shared_stats)