-
Notifications
You must be signed in to change notification settings - Fork 203
/
Copy pathrun_multi_functions_multi_trials_parallel.py
79 lines (67 loc) · 2.6 KB
/
run_multi_functions_multi_trials_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python
# Created by "Thieu" at 11:37, 02/03/2022 ----------%
# Email: [email protected] %
# Github: https://github.com/thieu1995 %
# --------------------------------------------------%
import concurrent.futures as parallel
from pathlib import Path
from opfunu.cec_basic import cec2014_nobias
from pandas import DataFrame
from mealpy.evolutionary_based.DE import BaseDE
model_name = "DE"
N_TRIALS = 5
LB = [-100, ] * 15
UB = [100, ] * 15
verbose = True
epoch = 100
pop_size = 50
wf = 0.8
cr = 0.9
func_names = ["F1", "F2", "F3"]
PATH_ERROR = "history/error/" + model_name + "/"
PATH_BEST_FIT = "history/best_fit/"
Path(PATH_ERROR).mkdir(parents=True, exist_ok=True)
Path(PATH_BEST_FIT).mkdir(parents=True, exist_ok=True)
def find_minimum(function_name):
"""
We can run multiple functions at the same time.
Each core (CPU) will handle a function, each function will run N_TRIALS times
"""
print(f"Start running: {function_name}")
error_full = {}
error_columns = []
best_fit_list = []
for id_trial in range(1, N_TRIALS + 1):
problem = {
"fit_func": getattr(cec2014_nobias, function_name),
"lb": LB,
"ub": UB,
"minmax": "min",
"log_to": "console",
"name": function_name
}
model = BaseDE(epoch=epoch, pop_size=pop_size, wf=wf, cr=cr, name=model_name)
_, best_fitness = model.solve(problem)
temp = f"trial_{id_trial}"
error_full[temp] = model.history.list_global_best_fit
error_columns.append(temp)
best_fit_list.append(best_fitness)
df = DataFrame(error_full, columns=error_columns)
df.to_csv(f"{PATH_ERROR}{len(LB)}D_{model_name}_{function_name}_error.csv", header=True, index=False)
print(f"Finish function: {function_name}")
return {
"func_name": function_name,
"best_fit_list": best_fit_list,
"model_name": model_name
}
if __name__ == '__main__':
## Run model
best_fit_full = {}
best_fit_columns = []
with parallel.ProcessPoolExecutor() as executor:
results = executor.map(find_minimum, func_names)
for result in results:
best_fit_full[result["func_name"]] = result["best_fit_list"]
best_fit_columns.append(result["func_name"])
df = DataFrame(best_fit_full, columns=best_fit_columns)
df.to_csv(f"{PATH_BEST_FIT}/{len(LB)}D_{model_name}_best_fit.csv", header=True, index=False)