-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
63 lines (45 loc) · 1.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import argparse
import importlib
import psutil
import os
import config
import constants
from utils.Database import Database
def is_running(script):
for q in psutil.process_iter():
if q.name().startswith('python'):
if len(q.cmdline()) > 1 and script in q.cmdline()[1] and q.pid != os.getpid():
return True
return False
def get_next_job(db):
job = db.get_next_processing_job_by_worker(
config.worker_id, constants.version)
if job is not None:
return job
db.assign_job_to_worker(config.worker_id, constants.version)
return db.get_next_processing_job_by_worker(config.worker_id, constants.version)
def process_job(db, job):
print(f"Processing job {job['id']}")
module = importlib.import_module("backtesting.strategies." + job['strategy'])
class_ = getattr(module, job['strategy'])
test = class_(**job['params'])
test.test()
results = test.get_results()
execution_time = test.get_init_execution_time() + test.get_test_execution_time()
db.save_job_results(job['id'], results)
db.finish_job_by_id(job['id'], execution_time)
if is_running('main.py'):
print(f"Script is already running")
else:
db = Database()
db.connect(config.db_host, config.db_user, config.db_pass, config.db_name)
parser = argparse.ArgumentParser()
parser.add_argument('--times', type=int, required=False, default=1,
help='The number of jobs the script should process.')
args = parser.parse_args()
for _ in range(args.times):
job = get_next_job(db)
if job is None:
print("No more jobs")
break
process_job(db, job)