-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdca_bot.py
703 lines (607 loc) · 25.8 KB
/
dca_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
import ccxt
import sqlite3
import pandas as pd
import numpy as np
import time
import json
import logging
import threading
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# ====================
# CONFIGURATIONS
# ====================
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('DCA_Bot')
# Fee & Slippage Settings
BINANCE_TAKER_FEE = 0.001 # 0.1%
BACKTEST_SLIPPAGE = 0.0002 # 0.02%
# Partial Fill Timeout
ORDER_FILL_TIMEOUT = 20
class DCABot:
def __init__(self, config_path: str = 'config.json'):
"""
Main DCA Bot class:
- Loads config & sets up CCXT with sandbox mode
- Initializes SQLite DB
- Provides methods for data fetching, backtesting, param optimization, etc.
"""
self.config_path = config_path
self.config = self._load_config(config_path)
# Configure ccxt (Binance)
self.exchange = ccxt.binance({
'enableRateLimit': True,
'apiKey': self.config.get('apiKey', ''),
'secret': self.config.get('secret', ''),
'options': {'defaultType': 'spot'},
})
# Set sandbox (testnet) mode
self.exchange.set_sandbox_mode(True)
# Sync local clock to reduce timestamp errors
self.exchange.load_time_difference()
# Initialize local DB
self.conn = sqlite3.connect('dca_bot.db', check_same_thread=False)
self._init_db()
# Track initial capital from testnet (if available)
self.initial_capital = 100000.0
self._fetch_account_balance()
# --------------------------------------
# LOAD CONFIG
# --------------------------------------
def _load_config(self, path: str) -> Dict:
try:
with open(path, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Configuration file {path} not found.")
return {}
# --------------------------------------
# DATABASE INITIALIZATION
# --------------------------------------
def _init_db(self):
cursor = self.conn.cursor()
# tokens_config
cursor.execute('''
CREATE TABLE IF NOT EXISTS tokens_config (
symbol TEXT PRIMARY KEY,
base_order REAL,
dca_order REAL,
price_deviation REAL,
take_profit REAL,
stop_loss REAL,
max_dca_orders INTEGER,
dca_multiplier REAL,
size_multiplier REAL,
cooldown INTEGER,
enabled BOOLEAN
)
''')
# historical_data
cursor.execute('''
CREATE TABLE IF NOT EXISTS historical_data (
symbol TEXT,
timestamp INTEGER,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
PRIMARY KEY (symbol, timestamp)
)
''')
# backtest_results
cursor.execute('''
CREATE TABLE IF NOT EXISTS backtest_results (
symbol TEXT,
params_hash TEXT,
total_profit REAL,
win_rate REAL,
max_drawdown REAL,
sharpe_ratio REAL,
PRIMARY KEY (symbol, params_hash)
)
''')
# dca_rounds
cursor.execute('''
CREATE TABLE IF NOT EXISTS dca_rounds (
round_id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT,
side TEXT,
avg_price REAL,
total_quantity REAL,
total_invested REAL,
dca_count INTEGER,
round_start_time INTEGER,
round_status TEXT
)
''')
# dca_trades
cursor.execute('''
CREATE TABLE IF NOT EXISTS dca_trades (
trade_id INTEGER PRIMARY KEY AUTOINCREMENT,
round_id INTEGER,
order_id TEXT,
symbol TEXT,
side TEXT,
order_type TEXT,
price REAL,
amount REAL,
cost REAL,
fee REAL DEFAULT 0,
reason TEXT,
timestamp INTEGER
)
''')
self.conn.commit()
# --------------------------------------
# INSERT TEST CONFIG
# --------------------------------------
def insert_test_config(self):
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO tokens_config (
symbol, base_order, dca_order, price_deviation, take_profit, stop_loss,
max_dca_orders, dca_multiplier, size_multiplier, cooldown, enabled
) VALUES (
'BTC/USDT', 100, 50, 2.0, 5.0, 3.0, 5, 1.2, 1.1, 3600, 1
)
''')
self.conn.commit()
logger.info("Inserted test config for BTC/USDT")
# --------------------------------------
# FETCH ACCOUNT BALANCE
# --------------------------------------
def _fetch_account_balance(self, retries=3):
for _ in range(retries):
try:
self.exchange.load_time_difference() # Re-sync timestamp
balance = self.exchange.fetch_balance()
usdt_available = balance.get('USDT', {}).get('free', 0)
if usdt_available > 0:
self.initial_capital = usdt_available
logger.info(f"Balance check successful: {usdt_available:.2f} USDT")
return
except ccxt.ExchangeError as e:
if "Timestamp" in str(e):
logger.warning("Timestamp sync issue detected - retrying...")
time.sleep(2)
continue
raise
# --------------------------------------
# HISTORICAL DATA FETCH
# --------------------------------------
def fetch_historical_data(self, symbol: str, timeframe: str = '1m', days: int = 100):
"""
Improved version with deduplication and upsert logic
"""
logger.info(f"Fetching {timeframe} data for {symbol} for last {days} days")
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
since = self.exchange.parse8601(start_date.isoformat())
# First check existing timestamps
existing_timestamps = pd.read_sql(
f'SELECT timestamp FROM historical_data WHERE symbol = "{symbol}"',
self.conn
)['timestamp'].values
all_ohlcv = []
while True:
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since, 1000)
if not ohlcv:
break
since = ohlcv[-1][0] + 1
all_ohlcv.extend(ohlcv)
if len(ohlcv) < 1000:
break
time.sleep(self.exchange.rateLimit / 1000)
df = pd.DataFrame(all_ohlcv, columns=['timestamp','open','high','low','close','volume'])
df['symbol'] = symbol
# Filter out existing timestamps
new_data = df[~df['timestamp'].isin(existing_timestamps)]
if new_data.empty:
logger.info(f"No new data to insert for {symbol}")
return
# Insert with conflict handling
new_data.to_sql(
'historical_data',
self.conn,
if_exists='append',
index=False,
method='multi',
chunksize=500
)
logger.info(f"Inserted {len(new_data)} new rows for {symbol}")
# --------------------------------------
# CORE BACKTEST LOGIC
# --------------------------------------
def backtest_strategy(self, symbol: str, param_ranges: Dict) -> Dict:
"""
Grid-search over param_ranges, returning best params for that symbol.
"""
logger.info(f"Backtesting {symbol} with param ranges: {param_ranges}")
df = pd.read_sql(f'''
SELECT timestamp, close
FROM historical_data
WHERE symbol=?
ORDER BY timestamp
''', self.conn, params=(symbol,))
if df.empty:
logger.warning(f"No historical data for {symbol}. Fetch first.")
return {}
best_params = None
best_profit = -np.inf
# Extract possible param values
devs = param_ranges.get('price_deviation', [2.0])
tps = param_ranges.get('take_profit', [5.0])
dca_mults = param_ranges.get('dca_multiplier', [1.2])
for dv in devs:
for tp in tps:
for dm in dca_mults:
cur_params = {
'price_deviation': dv,
'take_profit': tp,
'dca_multiplier': dm
}
result = self._run_single_backtest(df, cur_params)
self._store_backtest_result(symbol, result)
if result['total_profit'] > best_profit:
best_profit = result['total_profit']
best_params = result['params']
if best_params:
logger.info(f"BEST Params for {symbol}: {best_params}, profit={best_profit:.2f}")
return best_params if best_params else {}
def _run_single_backtest(self, df: pd.DataFrame, params: Dict) -> Dict:
"""
Vectorized or iterative backtest approach with extensive logging for debugging/verification.
For clarity, we'll do a more traditional iterative approach but with extra logs & checks.
"""
logger.debug("=== BACKTEST START ===")
logger.debug(f"Parameters: {params}")
logger.debug(f"Data length: {len(df)} rows")
price_dev_pct = params['price_deviation'] / 100.0
take_profit_pct = params['take_profit'] / 100.0
dca_mult = params['dca_multiplier']
capital = 10000.0
base_order_usd = 1000.0
dca_order_usd = 500.0
max_dca_orders = 5
position_size = 0.0
avg_price = 0.0
total_invested = 0.0
dca_count = 0
realized_profit = 0.0
# For advanced metrics
eq_curve = []
peak_equity = capital
min_equity = capital
trade_count = 0 # number of trades executed
close_prices = df['close'].values
timestamps = df['timestamp'].values
for i in range(len(close_prices)):
price = close_prices[i]
# Debug log every 2000 steps for large data
if i % 2000 == 0 and i != 0:
logger.debug(f"Row {i}/{len(close_prices)} - Price: {price}")
# If no position, place base order
if position_size == 0:
if capital >= base_order_usd:
fill_price = price * (1 + BACKTEST_SLIPPAGE)
fill_qty = (base_order_usd / fill_price) * (1 - BINANCE_TAKER_FEE)
position_size = fill_qty
avg_price = fill_price
total_invested = base_order_usd
capital -= base_order_usd
trade_count += 1
logger.debug(f"[BASE] Filled {fill_qty:.4f} at {fill_price:.2f}, capital={capital:.2f}")
eq_curve.append(capital)
continue
# If in position, check DCA
if dca_count < max_dca_orders:
threshold_price = avg_price * (1 - price_dev_pct * (dca_mult ** dca_count))
if price < threshold_price and capital >= dca_order_usd:
fill_price = price * (1 + BACKTEST_SLIPPAGE)
fill_qty = (dca_order_usd / fill_price) * (1 - BINANCE_TAKER_FEE)
# Weighted average
total_cost = (position_size * avg_price) + dca_order_usd
position_size += fill_qty
avg_price = total_cost / position_size
capital -= dca_order_usd
total_invested += dca_order_usd
dca_count += 1
trade_count += 1
logger.debug(f"[DCA #{dca_count}] price={fill_price:.2f}, pos_size={position_size:.4f}")
# Check take profit
tp_price = avg_price * (1 + take_profit_pct)
if price >= tp_price:
# Sell entire position
fill_price = price * (1 - BACKTEST_SLIPPAGE)
sale_value = position_size * fill_price * (1 - BINANCE_TAKER_FEE)
profit = sale_value - total_invested
realized_profit += profit
capital += sale_value
logger.debug(f"[TP] profit={profit:.2f}, new capital={capital:.2f}")
# Reset
position_size = 0
avg_price = 0
total_invested = 0
dca_count = 0
trade_count += 1
# Track equity
mark_value = position_size * price
cur_equity = capital + mark_value
eq_curve.append(cur_equity)
peak_equity = max(peak_equity, cur_equity)
min_equity = min(min_equity, cur_equity)
# If position left open, mark to market
if position_size > 0:
last_price = close_prices[-1]
fill_price = last_price * (1 - BACKTEST_SLIPPAGE)
sale_value = position_size * fill_price * (1 - BINANCE_TAKER_FEE)
profit = sale_value - total_invested
realized_profit += profit
capital += sale_value
position_size = 0
logger.debug(f"[FINAL CLOSE] profit={profit:.2f}, capital={capital:.2f}")
final_equity = capital
max_drawdown = (peak_equity - min_equity) / peak_equity if peak_equity > 0 else 0
# Build a returns series
eq_series = pd.Series(eq_curve)
returns = eq_series.pct_change().dropna()
if len(returns) == 0:
win_rate = 0
sharpe_ratio = 0
else:
# Win rate = fraction of positive daily changes in equity
win_rate = len(returns[returns > 0]) / len(returns)
sharpe_ratio = 0
if returns.std() > 0:
# If data is 1-min bars, consider converting to daily basis for annualization
# For demonstration, we'll do a naive approach:
sharpe_ratio = (returns.mean() / returns.std()) * np.sqrt(24*60*365) # ~525600 min/year
logger.debug("=== BACKTEST RESULTS ===")
logger.debug(f"Final capital: {capital:.2f}")
logger.debug(f"Realized profit: {realized_profit:.2f}")
logger.debug(f"Trade count: {trade_count}")
logger.debug("=== END BACKTEST ===")
return {
'params': params,
'total_profit': realized_profit,
'win_rate': win_rate,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio
}
def _store_backtest_result(self, symbol: str, result: Dict):
cursor = self.conn.cursor()
params_str = json.dumps(result['params'], sort_keys=True)
params_hash = hashlib.md5(params_str.encode('utf-8')).hexdigest()
cursor.execute('''
INSERT OR REPLACE INTO backtest_results
VALUES (?, ?, ?, ?, ?, ?)
''', (
symbol,
params_hash,
result['total_profit'],
result['win_rate'],
result['max_drawdown'],
result['sharpe_ratio']
))
self.conn.commit()
# --------------------------------------
# LOGIC VALIDATION & TEST SCENARIOS
# --------------------------------------
def _verify_backtest_logic(self):
logger.info("Running logic verification tests...")
# Use global declaration to modify module-level variables
global BINANCE_TAKER_FEE, BACKTEST_SLIPPAGE
# Save original values
original_fee = BINANCE_TAKER_FEE
original_slippage = BACKTEST_SLIPPAGE
try:
# 1) Flat line test (disable fees/slippage)
BINANCE_TAKER_FEE = 0.0
BACKTEST_SLIPPAGE = 0.0
test_data = pd.DataFrame({'close': [100]*1000, 'timestamp': range(1000)})
flat_params = {'price_deviation': 2.0, 'take_profit': 5.0, 'dca_multiplier': 1.2}
result_flat = self._run_single_backtest(test_data, flat_params)
assert abs(result_flat['total_profit']) < 1e-6, f"Flat market should yield 0 profit. Got {result_flat['total_profit']}"
# 2) Steady rise (restore fees/slippage)
BINANCE_TAKER_FEE = original_fee
BACKTEST_SLIPPAGE = original_slippage
steady_data = pd.DataFrame({'close': np.linspace(100, 200, 1000), 'timestamp': range(1000)})
result_steady = self._run_single_backtest(steady_data, flat_params)
assert result_steady['total_profit'] > 0, "Rising market => expect profit"
# 3) Immediate TP
immediate_data = pd.DataFrame({'close': [100, 105, 105], 'timestamp': [0,1,2]})
immediate_params = {'price_deviation': 1.0, 'take_profit': 5.0, 'dca_multiplier': 1.0}
result_immediate = self._run_single_backtest(immediate_data, immediate_params)
assert result_immediate['total_profit'] > 0, "Immediate TP => quick profit"
logger.info("All logic verification tests passed.")
finally:
# Always restore original values
BINANCE_TAKER_FEE = original_fee
BACKTEST_SLIPPAGE = original_slippage
# --------------------------------------
# PARALLEL / SHADOW TRADING
# --------------------------------------
def run_backtest_validation(self, symbol: str):
"""
1) Fetch fresh minute data for 1 day
2) Backtest
3) Execute strategy in 'paper' mode
4) Compare results
"""
# Clear existing validation data
self.conn.execute(f'DELETE FROM historical_data WHERE symbol = "{symbol}"')
# Fetch new 1-day dataset
self.fetch_historical_data(symbol, '1m', days=1)
# Let's pick a small range for param testing
param_test = {
'price_deviation': [1.0, 2.0],
'take_profit': [3.0, 5.0],
'dca_multiplier': [1.0, 1.2]
}
best_params = self.backtest_strategy(symbol, param_test)
if not best_params:
logger.warning(f"No profitable params found for {symbol} in last day. Validation aborted.")
return
# Assume best_params is good
logger.info(f"Paper-trading with best params: {best_params}")
paper_results = self._execute_dca_strategy(symbol, best_params, paper_trading=True)
# Compare with backtest again (maybe)
backtest_res = self._run_single_backtest(
pd.read_sql(f'SELECT timestamp, close FROM historical_data WHERE symbol="{symbol}" ORDER BY timestamp', self.conn),
best_params
)
self._compare_results(backtest_res, paper_results)
def _compare_results(self, backtest_res: Dict, paper_results: Dict):
logger.info(f"Comparing Backtest => {backtest_res['total_profit']:.2f} profit vs Paper => {paper_results['profit']:.2f} profit")
# --------------------------------------
# EXECUTION - PAPER OR LIVE
# --------------------------------------
def execute_dca_strategy_if_profitable(self, symbol: str, param_ranges: Dict):
"""
1) Run a backtest on the given symbol
2) If profitable => proceed with real trades
3) Else => skip
"""
best_params = self.backtest_strategy(symbol, param_ranges)
if not best_params:
logger.warning(f"No profitable parameters found for {symbol}. Skipping real trading.")
return
# Check final best param's profit via direct single-run
df = pd.read_sql(f'SELECT timestamp, close FROM historical_data WHERE symbol="{symbol}" ORDER BY timestamp', self.conn)
final_test = self._run_single_backtest(df, best_params)
if final_test['total_profit'] <= 0:
logger.warning("Backtest not profitable. Real trading aborted.")
return
# If profitable => start live trading
logger.info(f"Backtest shows profit: {final_test['total_profit']:.2f}. Proceeding with real trades on {symbol}.")
self._execute_dca_strategy(symbol, best_params, paper_trading=False)
def _execute_dca_strategy(self, symbol: str, params: Dict, paper_trading=True) -> Dict:
"""
A minimal example of how you'd run DCA in real-time or in a 'paper' mode.
For production, you'd have a loop or a scheduler calling this every minute.
"""
# We'll just simulate 1 pass for demonstration
current_price = self._get_current_price(symbol)
# If paper trading, we might use mid-price from order book
if paper_trading:
try:
ob = self.exchange.fetch_order_book(symbol)
bid = ob['bids'][0][0]
ask = ob['asks'][0][0]
current_price = (bid + ask)/2
except:
pass
# minimal logic: If no open round => base order
open_round_id = self._get_open_round_id(symbol)
if not open_round_id:
# We'll just do a market buy simulation. In real code, you'd do partial fill checks, etc.
base_order_usd = 100
qty = base_order_usd / current_price
logger.info(f"(Paper={paper_trading}) Base buy {qty:.4f} {symbol} at {current_price:.2f}")
# If not paper_trading => place real ccxt order
# Insert/track 'open round' in DB if you want continuity
# Return some dummy results for now
return {
'symbol': symbol,
'profit': np.random.randint(1, 500), # mock profit
}
# --------------------------------------
# DB / UTILITY METHODS
# --------------------------------------
def _get_open_round_id(self, symbol: str) -> Optional[int]:
cursor = self.conn.cursor()
cursor.execute('''
SELECT round_id FROM dca_rounds
WHERE symbol=? AND round_status="open"
''', (symbol,))
row = cursor.fetchone()
return row[0] if row else None
def _get_current_price(self, symbol: str) -> float:
try:
ticker = self.exchange.fetch_ticker(symbol)
return float(ticker['last'])
except Exception as e:
logger.warning(f"Error fetching price for {symbol}: {e}")
return 0.0
def _get_enabled_symbols(self) -> List[str]:
cursor = self.conn.cursor()
cursor.execute('SELECT symbol FROM tokens_config WHERE enabled=1')
return [r[0] for r in cursor.fetchall()]
# --------------------------------------
# DAILY OPTIMIZATION LOOP
# --------------------------------------
def optimize_parameters(self):
while True:
symbols = self._get_enabled_symbols()
for symbol in symbols:
param_ranges = {
'price_deviation': np.arange(1.0, 3.0, 1.0),
'take_profit': np.arange(2.0, 6.0, 1.0),
'dca_multiplier': np.arange(1.0, 1.5, 0.2)
}
best_params = self.backtest_strategy(symbol, param_ranges)
if best_params:
self._update_symbol_config(symbol, best_params)
time.sleep(86400)
def _update_symbol_config(self, symbol: str, new_params: Dict):
cursor = self.conn.cursor()
cursor.execute('''
UPDATE tokens_config
SET price_deviation=?,
take_profit=?,
dca_multiplier=?
WHERE symbol=?
''', (
new_params['price_deviation'],
new_params['take_profit'],
new_params['dca_multiplier'],
symbol
))
self.conn.commit()
# --------------------------------------
# Forcible Liquidation / Risk Mgt
# --------------------------------------
def _check_portfolio_drawdown(self, threshold: float = 0.1):
"""Example logic: if USDT falls below 90% of initial, forcibly exit all."""
try:
balance = self.exchange.fetch_balance()
usdt_now = balance.get('USDT', {}).get('free', 0)
if usdt_now < self.initial_capital*(1-threshold):
logger.warning("Drawdown limit triggered! Liquidating all positions.")
self._liquidate_all_positions()
except Exception as e:
logger.error(f"Drawdown check error: {e}")
def _liquidate_all_positions(self):
# Implementation depends on how many rounds are open, etc.
pass
# --------------------------------------------
# SCRIPT ENTRY (EXAMPLE USAGE)
# --------------------------------------------
if __name__ == "__main__":
bot = DCABot(config_path='config.json')
# 1) Insert test config if needed
bot.insert_test_config()
# 2) Run verification tests on synthetic scenarios
bot._verify_backtest_logic()
# 3) Fetch 100 days of 1-minute data for BTC/USDT
bot.fetch_historical_data('BTC/USDT', '1m', days=100)
# 4) Optionally do a quick shadow trading validation
bot.run_backtest_validation('BTC/USDT')
# 5) Only trade live if the strategy is profitable
param_ranges = {
'price_deviation': [2.0],
'take_profit': [5.0],
'dca_multiplier': [1.2]
}
bot.execute_dca_strategy_if_profitable('BTC/USDT', param_ranges)
# 6) (Optional) Start daily parameter optimization in background
def optimization_loop():
bot.optimize_parameters()
t = threading.Thread(target=optimization_loop, daemon=True)
t.start()
# Keep the script alive, or do something else
while True:
bot._check_portfolio_drawdown()
time.sleep(60)