Skip to content

Commit 236889b

Browse files
committed
Revert "ENH: Use joblib.Parallel for Backtest.optimize(method='grid')"
This reverts commit 7b69b1f.
1 parent 1be3326 commit 236889b

File tree

3 files changed

+57
-16
lines changed

3 files changed

+57
-16
lines changed

backtesting/backtesting.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88

99
from __future__ import annotations
1010

11+
import multiprocessing as mp
12+
import os
1113
import sys
1214
import warnings
1315
from abc import ABCMeta, abstractmethod
16+
from concurrent.futures import ProcessPoolExecutor, as_completed
1417
from copy import copy
1518
from functools import lru_cache, partial
1619
from itertools import chain, product, repeat
@@ -20,7 +23,6 @@
2023

2124
import numpy as np
2225
import pandas as pd
23-
from joblib import Parallel, delayed
2426
from numpy.random import default_rng
2527

2628
try:
@@ -1495,15 +1497,41 @@ def _optimize_grid() -> Union[pd.Series, Tuple[pd.Series, pd.Series]]:
14951497
[p.values() for p in param_combos],
14961498
names=next(iter(param_combos)).keys()))
14971499

1498-
with Parallel(prefer='threads', require='sharedmem', max_nbytes='50M',
1499-
n_jobs=-2, return_as='generator') as parallel:
1500-
results = _tqdm(
1501-
parallel(delayed(self._mp_task)(self, params, maximize=maximize)
1502-
for params in param_combos),
1503-
total=len(param_combos),
1504-
desc='Backtest.optimize')
1505-
for value, params in zip(results, param_combos):
1506-
heatmap[tuple(params.values())] = value
1500+
def _batch(seq):
1501+
n = np.clip(int(len(seq) // (os.cpu_count() or 1)), 1, 300)
1502+
for i in range(0, len(seq), n):
1503+
yield seq[i:i + n]
1504+
1505+
# Save necessary objects into "global" state; pass into concurrent executor
1506+
# (and thus pickle) nothing but two numbers; receive nothing but numbers.
1507+
# With start method "fork", children processes will inherit parent address space
1508+
# in a copy-on-write manner, achieving better performance/RAM benefit.
1509+
backtest_uuid = np.random.random()
1510+
param_batches = list(_batch(param_combos))
1511+
Backtest._mp_backtests[backtest_uuid] = (self, param_batches, maximize)
1512+
try:
1513+
# If multiprocessing start method is 'fork' (i.e. on POSIX), use
1514+
# a pool of processes to compute results in parallel.
1515+
# Otherwise (i.e. on Windos), sequential computation will be "faster".
1516+
if mp.get_start_method(allow_none=False) == 'fork':
1517+
with ProcessPoolExecutor() as executor:
1518+
futures = [executor.submit(Backtest._mp_task, backtest_uuid, i)
1519+
for i in range(len(param_batches))]
1520+
for future in _tqdm(as_completed(futures), total=len(futures),
1521+
desc='Backtest.optimize'):
1522+
batch_index, values = future.result()
1523+
for value, params in zip(values, param_batches[batch_index]):
1524+
heatmap[tuple(params.values())] = value
1525+
else:
1526+
if os.name == 'posix':
1527+
warnings.warn("For multiprocessing support in `Backtest.optimize()` "
1528+
"set multiprocessing start method to 'fork'.")
1529+
for batch_index in _tqdm(range(len(param_batches))):
1530+
_, values = Backtest._mp_task(backtest_uuid, batch_index)
1531+
for value, params in zip(values, param_batches[batch_index]):
1532+
heatmap[tuple(params.values())] = value
1533+
finally:
1534+
del Backtest._mp_backtests[backtest_uuid]
15071535

15081536
if pd.isnull(heatmap).all():
15091537
# No trade was made in any of the runs. Just make a random
@@ -1552,7 +1580,7 @@ def memoized_run(tup):
15521580
stats = self.run(**dict(tup))
15531581
return -maximize(stats)
15541582

1555-
progress = iter(_tqdm(repeat(None), total=max_tries, desc='Backtest.optimize'))
1583+
progress = iter(_tqdm(repeat(None), total=max_tries, leave=False, desc='Backtest.optimize'))
15561584
_names = tuple(kwargs.keys())
15571585

15581586
def objective_function(x):
@@ -1597,9 +1625,11 @@ def cons(x):
15971625
return output
15981626

15991627
@staticmethod
1600-
def _mp_task(bt, params, *, maximize):
1601-
stats = bt.run(**params)
1602-
return maximize(stats) if stats['# Trades'] else np.nan
1628+
def _mp_task(backtest_uuid, batch_index):
1629+
bt, param_batches, maximize_func = Backtest._mp_backtests[backtest_uuid]
1630+
return batch_index, [maximize_func(stats) if stats['# Trades'] else np.nan
1631+
for stats in (bt.run(**params)
1632+
for params in param_batches[batch_index])]
16031633

16041634
_mp_backtests: Dict[float, Tuple['Backtest', List, Callable]] = {}
16051635

backtesting/test/_test.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,18 @@ def test_max_tries(self):
621621
**OPT_PARAMS)
622622
self.assertEqual(len(heatmap), 6)
623623

624+
def test_multiprocessing_windows_spawn(self):
625+
df = GOOG.iloc[:100]
626+
kw = {'fast': [10]}
627+
628+
stats1 = Backtest(df, SmaCross).optimize(**kw)
629+
with patch('multiprocessing.get_start_method', lambda **_: 'spawn'):
630+
with self.assertWarns(UserWarning) as cm:
631+
stats2 = Backtest(df, SmaCross).optimize(**kw)
632+
633+
self.assertIn('multiprocessing support', cm.warning.args[0])
634+
assert stats1.filter(chars := tuple('[^_]')).equals(stats2.filter(chars)), (stats1, stats2)
635+
624636
def test_optimize_invalid_param(self):
625637
bt = Backtest(GOOG.iloc[:100], SmaCross)
626638
self.assertRaises(AttributeError, bt.optimize, foo=range(3))
@@ -636,7 +648,7 @@ def test_optimize_speed(self):
636648
start = time.process_time()
637649
bt.optimize(fast=(2, 5, 7), slow=[10, 15, 20, 30])
638650
end = time.process_time()
639-
self.assertLess(end - start, 1)
651+
self.assertLess(end - start, .2)
640652

641653

642654
class TestPlot(TestCase):

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
'numpy >= 1.17.0',
3535
'pandas >= 0.25.0, != 0.25.0',
3636
'bokeh >= 1.4.0, != 3.0.*, != 3.2.*',
37-
'joblib',
3837
],
3938
extras_require={
4039
'doc': [

0 commit comments

Comments
 (0)