Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live trading support #81

Open
sdmovie opened this issue May 28, 2020 · 23 comments · May be fixed by #1009
Open

Live trading support #81

sdmovie opened this issue May 28, 2020 · 23 comments · May be fixed by #1009
Labels
API Needs API-related discussion enhancement New feature or request

Comments

@sdmovie
Copy link

sdmovie commented May 28, 2020

Expected Behavior

I am trying to bind the realtrade into this frame work. Of course replacements of Backtest , _Broker , and datafeeding need rewrite. I got them (data API, broker API) on hand and had some ideas to plant in.
But I want to hear from @kernc you on your advices/thought first before start, esp. if there are already a reference ~

Actual Behavior

Steps to Reproduce

Additional info

  • Backtesting version:
@kernc
Copy link
Owner

kernc commented May 28, 2020

Afaik, there were no prior efforts of getting real, live trading in. I guess _Broker is certainly to be overridden. Just please do base your findings upon #47 as I do eventually intend to get that in.

@sdmovie
Copy link
Author

sdmovie commented May 29, 2020

after furthur study, I think practical approch is to mirror backtesting.py to a realtrading.py file (not disturbing original backtesting.py), still share Strategy class from backtesting.py. Backtest class, _broker, position need rewrite in new file, order/trade not sure as of now, probably keep as mirror of trade serverside data. No obvious barrier seen now for the real trade implementation(very broker API specific) :-)
The way of passing dynamic data into _broker,strategy using a _Data class is really smart , will copy the method in the realtrading.

@kernc
Copy link
Owner

kernc commented Jun 1, 2020

I envision live trading by way of a separate class, in Python type notation, say:

from typing import Type, Union, Literal, NoReturn

LiveTrade(strategy: Type[Strategy],
          period: str,
          broker: Union[Literal["ib", "oanda", "ccxt"], Broker],
          **kwargs) -> NoReturn

The idea is we can reuse the same Strategy (hence we must adapt the _Broker), we must provide the trading cyclic period which to call Strategy.next() on (should likely match the period in bars of backtested-on data), and the chosen broker backend (here a string, mapping to each standalone class internally). Extra kwargs (a list of instruments, margins, and such) would be passed to the broker.

The common broker API is sure in the domain of its implementer. 😄

@kernc kernc changed the title can you give some hints on real trade Live trading support Jul 8, 2020
@kernc kernc added the enhancement New feature or request label Jul 8, 2020
@kernc kernc added the API Needs API-related discussion label Jul 15, 2020
@arunavo4
Copy link

@kernc This needs some support, there has to be an abstract base class for live trading so that people can implement the Brokers of their choice as they like and ccxt library( for Crypto ) support as an example.

cc @ttfreeman Tagging you in case you are interested.

@ttfreeman
Copy link
Contributor

@arunavo4 Yes, definitely interested, though need to research on the best way to tackle this prior to diving in. I like @kernc 's suggestion to have a separate class (LiveTrade) but we might also need another class (LiveData) that would bridge the live data from broker to LiveTrade class.

feed=LiveData(....init params)
feed.connect(localhost, 567, ...)

LiveTrade( feed.data,
             strategy: Type[Strategy],
             period: str,
             broker: Union[Literal["ib", "oanda", "ccxt"], Broker],
             **kwargs) -> NoReturn

@kernc
Copy link
Owner

kernc commented Nov 26, 2020

Data acquisition can surely be handled by the chosen broker (i.e. Broker)? Broker implementations should take their configuration parameters from **kwargs.

@sdmovie
Copy link
Author

sdmovie commented Nov 28, 2020

practically my current process to live trading is rewrite strategy ( slight modification - thanks to the backtesting's clear structure and pandas compatible data carrier) , to fit in live broker/data source. Building a BT compatible live trading framework is not a small project - considering BT is single equity, single datafeed( though resample applicable) , while trading is diversified. Now I use backtesting.py to verify my alorithm( or subset of algo) which works pefectly.Then write trading in a flexible way.
ps. I give up portion buy/sell in BT, in real world(trading), broker has no percentage trade API, strategy is responsible of calculating target buy/sell volume, also 'cause calculation for buy-open/sell-open/buy-close/sell-close volume and margin is too complicate for me.

@binarymason
Copy link
Contributor

I'd recommend keeping data feeds and brokers separate responsibilities. That way theoretically one could subscribe to any data source then execute trades with the broker of their choice without being tightly coupled to anything.

Definitely agree that having a path for people to BYOB (broker) is the way to go. Then strategy.buy() or strategy.sell() is executed on their broker of choice, even if it is a backtest broker.

@crazy25000
Copy link
Contributor

Has anyone started work on this (that's shareable)?

@arunavo4
Copy link

@crazy25000 Have started working on it. but it's very integrated into my project. The gist is that we can easily (maybe not too easy) Make a class Similar to Backtest. Which can have Corresponding Broker Classes like CCXT/ Oanda/ Alpaca.

@workingxx92
Copy link

Waiting

@mac133k
Copy link

mac133k commented Oct 30, 2021

For those who are interested I found a workaround to generate trading signals with backetsting.py. I use a custom strategy class that extends the standard one:

  • The strategy class has a bool var to determine whether I want to run a backtest or check the conditions for the latest data.
  • In the init call we have the access to the entire data object, so if my bool var is set for generating signals I evaluate the entry conditions right there.
  • I have the methods for checking long and short entry conditions in my strategy class where I use indices for prices and indicators relative to the end of the data array (ie. [-1], [-2] etc.), so the methods work the same in backtesting and signal generation mode.
  • After evaluating the entry conditions I immediately check the order list and save the orders in an array to generate trading alerts elsewhere in the code.

This does not require any modification to the backtesting.py source code.

@guzuomuse
Copy link

guzuomuse commented Nov 15, 2021

@crazy25000 Have started working on it. but it's very integrated into my project. The gist is that we can easily (maybe not too easy) Make a class Similar to Backtest. Which can have Corresponding Broker Classes like CCXT/ Oanda/ Alpaca.

@arunavo4 could you share how your LiveTrade Class looks like?

@SpaceNerden
Copy link

Any updates on this? I would love to see this feature added.

@tumma72
Copy link

tumma72 commented May 23, 2022

Hi everyone, I am probably late to the party but I wanted to share that I am implementing similar ideas. I do use ccxt as broker for live trading and I managed to implement a Strategy object in my framework which extends the backtesting.py Strategy. My Strategy has a run_live() and a run_backtest() the latter calls BackTesting with self. The tricky part and not so elegant perhaps is that both frameworks have models such as Order and Position which aren’t really compatible. I refrained from going down the way of implementing a Broker extension of _Broker because it has clearly a backtesting design and purpose at the moment. It would be great if Broker would be more “open” to subclassing and alternative implementations. I’ll keep you posted on the progress if someone is interested.

@hakunin
Copy link

hakunin commented Jun 11, 2022

Hi everyone, I am probably late to the party but I wanted to share that I am implementing similar ideas. I do use ccxt as broker for live trading and I managed to implement a Strategy object in my framework which extends the backtesting.py Strategy. My Strategy has a run_live() and a run_backtest() the latter calls BackTesting with self. The tricky part and not so elegant perhaps is that both frameworks have models such as Order and Position which aren’t really compatible. I refrained from going down the way of implementing a Broker extension of _Broker because it has clearly a backtesting design and purpose at the moment. It would be great if Broker would be more “open” to subclassing and alternative implementations. I’ll keep you posted on the progress if someone is interested.

I'm about to do the same thing, can you share a gist of how you went about takcling it? Just some partial code would be great.

@tumma72
Copy link

tumma72 commented Jun 15, 2022

I'm about to do the same thing, can you share a gist of how you went about takcling it? Just some partial code would be great.

Hey @hakunin perhaps we can join forces on this, provided @kernc will provide support for merging later on. Here are some of the issues which would require a Backtesting.by redesign, as of now I am solving them with real-time override (not nice):

  1. It is quite easy to use Strategy as the center of the design of a live-trading enabled bot, provided we can adapt the Strategy.I() to support re-evaluation of live candlestick data, which are needed for live trading. At the moment I have implemented a method that swaps the _Data() object in self._data when there are new candlesticks available. Not really clean but kind of does the trick. A much better design would be for the logic to support paper-trading and live-trading without having to swap data under the carpet;
  2. At the moment the whole Backtesting.py framework only supports trading of one instrument/market at a time. While this might fit several cases, it definitely limits significantly what type of strategies could be implemented. For example on a portfolio scalping scenario, I would like to always retrieve the 10 top gainers, wait for a trend, cash in, change market to the next trending. I do not want to create a new instance of the same strategy for each new market that the screening algorithm finds, it is not efficient, and more over I want to do all of the calculations on a joint account, and measure the performance;
  3. While just compressing the whole strategy in a single method: Strategy.next() might be very simple and flexible, it is also leaving quite a lot on the plate of the user in terms of checking balances, order types... In my original framework I have started with the idea of just having a StrategyStep as an object, and having a Strategy.add_step(StrategyStep) to create whatever sequence of steps one may desire. At the end of an execution cycle, the OrderProcessor will take care of checking open orders, new orders... I have simplified this to facilitate unexperienced users, at the moment only several of my close friends, to create their own strategies by adding the following helpers:
  • should_buy(): where the user can implement the logic necessary when should a position be opened;
  • should_sell(): same for selling.
  • should_exit(): when it is time to exit, with profit or loss.
  • the corresponding do_buy(), do_sell(), do_exit() will implement the customized logic when the condition matches. Each of the "should" returns a boolean, each of the "do" returns a List[Order]. While this flow can be incapsulated in the .next() it introduces difficulties with the downloading of new data, and the replacing of the "current" indicators (see point 1).
  1. The way I have approached the Bot I am creating is by having a Runner, supporting asyncio, which starts with a configuration file (in JSON) in which different pairs of Exchange/Broker and Strategy are configured. This means the Runner runs the strategy, and in order to differentiate between different execution modes I have created 3 types: BackTesting, PaperTrading, LiveTrading. As the Runner instantiates the strategies and calls the .run() with the execution mode to get started, it is quite tricky to integrate the BackTesting mode with Backtesting.py because the Backtest object that wants to instantiate a Strategy. It would be great if it were possible to bass to Backtest not only a class but also an instance so that there wouldn't be different instances of the same strategy class at the same time. Because of a strategy dealing with multiple markets (see point 2) it would be more effective to encapsulate the execution of the backtesting within the strategy, rather than having to run it outside and then aggregate the results. At the moment I am running the backtesting for each of the markets, and then aggregating the data at the end, very slow and not effective.
  2. I would love to be able to have more Base Abstract Classes to extend the framework which are "public", and while would still implement the basic logic needed for Backtesting, would also allow to be extended to implement different accounting approaches, or exit, enter strategies depending on market trends. For example I would love to have as extendable classes:
  • Order: there are quite a lot of more information needed when trading live, than what is now visible
  • Broker: I have created an Exchange class that wraps the CCXT library to connect to different exchanges, I would love to have it extend Broker if it were more implementation independent (i.e.: depending on a lot of internal classes _Data, _Array...)
  • Position: same as order
  • Account: here things can really get complicated with different currencies conversion and future vs spot trading...

While I am far from being done, and I have to confess I have considered multiple times to give up with the integration, I am still trying and hoping that things will evolve towards a more open framework approach, which probably won't bother the final users, but would definitely allow the Backtesting.py to become part of many trading solutions out there 😉 I will keep you posted on the progress and as soon as I will have an end-to-end working sample I will share it with you.

@armwdev
Copy link

armwdev commented Jan 5, 2023

For those who are interested I found a workaround to generate trading signals with backetsting.py. I use a custom strategy class that extends the standard one:

  • The strategy class has a bool var to determine whether I want to run a backtest or check the conditions for the latest data.
  • In the init call we have the access to the entire data object, so if my bool var is set for generating signals I evaluate the entry conditions right there.
  • I have the methods for checking long and short entry conditions in my strategy class where I use indices for prices and indicators relative to the end of the data array (ie. [-1], [-2] etc.), so the methods work the same in backtesting and signal generation mode.
  • After evaluating the entry conditions I immediately check the order list and save the orders in an array to generate trading alerts elsewhere in the code.

This does not require any modification to the backtesting.py source code.

Live trading is a must have feature I think and I am really interested in which files or where to add those modifications. I have multiple indicators and added the bool in the custom strategy class. I've made similar modifications but I got an error and trying to solve this:
Indicators must return (optionally a tuple of) numpy.arrays of same length as 'data' (data shape: (11830,); indicator "wt_indicator(df)"shape: (7, 11831), returned value: [[ 0. 0. 0. ... 0. 0. 0. ] [ 0. 0. 0. ... 0. 0. 0. ] [ 0. 0. 0. ... 1. 1. 1. ] ... [ nan nan nan ... -1.5671848 10.43540248 13.05147345] [ nan nan nan ... -20.60514641 -7.00140005 7.30656371] [ 0. 0. 0. ... 1. 0. 0. ]])

@OppOops
Copy link

OppOops commented Feb 24, 2023

I have write a draft version based on comments above,
OppOops@1879d9d
the codes needs original Backtest class to run.

from backtesting.live import LiveTrade, LiveMarketOhlcv

bt = Backtest(df_partial, SmaCross, commission=.002,
              exclusive_orders=True)
live_trade = LiveTrade(bt)

For workaround here, I clone a mirror strategy and a mirror data instance.
Also, modify the _Data class to the _DataCachePatch and add antorher new class _ObjectBindingDict.

Below are some script codes modified from the example in Readme.md:
(full version at: https://gist.github.com/OppOops/282a2fb3d07019618414d2a51e9084a1)

def run_with_live_new_ohlcv(live_trade: LiveTrade):
    # select data from external (iteration not run currently)
    ...
    live_trade.on_bar(LiveMarketOhlcv(...))
    live_trade.run_next()

bt = Backtest(df_partial, SmaCross, commission=.002,
              exclusive_orders=True)
live_trade = LiveTrade(bt)
live_trade.init()

live_trade.run(to_end=True)
...
run_with_live_new_ohlcv(live_trade)

You can controll LiveTrade class with backtest core to run or stop at any Ohlcv bar.
Also with on_bar to insert new ohlcv data to it.
Use some other features like close_last_positions, process_orders to generate or submit your orders

I haven't test detailed in my version, but just comaparsion of GOOG data result with sample.
Currently the value of broker._equity[-1] will be different with original version.
(I have test entry price and trade size are same mostly, but need further checking.)

@tylerstar
Copy link

I have developed a plugin for live trading, but it's made up of homemade code and contains a lot of customizations. Therefore, I cannot release it yet. However, the basic logic is similar to that of backtrader's store.

To begin, you will need to create a new class called "LiveTrading" that will replace "Backtest." This class will trigger the "strategy.next()" and "strategy.init()" functions every time new data is received. (I'll explain why the "strategy.init()" function is necessary later on.)

Next, you will need to create a "DataFeed" class to fetch the data via ccxt/api/websocket. You can set it up however you like.

After that, create a new broker based on the broker's sdk/api. Since I trade in cryptocurrency, I created one based on ccxt. I also created a paper trading broker for testing purposes.

In "backtesting.py," all indicators are calculated when "Backtest" is initialized within the "strategy.init()" function. Therefore, it is necessary to recalculate the indicator each time new data is received. However, the current logic of "backtesting.py" will dump the indicator function, making it impossible to recalculate. A lazy solution would be to re-call the "strategy.init()" function. However, if you have other states inside "strategy.init()" (which is very common), you will lose the state each time you call "strategy.init()." To address this, I created another abstract method for the "Strategy" class that is dedicated to state initialization, and ensured that it is called only once.

With the "DataFeed"/ "LiveTrading" (replacing "Backtest")/ "Broker," you can easily build your own robot with just a few modifications to your strategy code (making it less likely to introduce new bugs after modification).

@ypogorelova ypogorelova linked a pull request Jun 28, 2023 that will close this issue
@milcs
Copy link

milcs commented Jan 5, 2024

My approach into using the backtesting.py together with live trading is a bit different:
Created a Multi-bot that trades multiple pairs with Binance.

  1. Backtesting.py used for actual backtest
  • Backtest select pair, interval, from start to end time, strategy settings as per normal use of Backtesting with my own changes. Primary change is related to making a buy price be the last close and not previous close. so Close[-1] instead of Close[-2]. Why? When a candle closes, recalculation is required and new candle fires a signal or not, the only possible entry is the current price or close price. Made couple other changes to support backtesting with fixed amount. Assume I want to trade 100 USDT with every trade on a BTCUSDT. The position_size has been strangely defined as 0< size <1 as being equity and >1 as being number of coins, in my BTCUSDT that would be number of BTCs to buy. With my changes I can go with 100 USDT like so:
  bt = Backtest(_df,
                bot.strategy,
                cash=self.cash,
                commission=self.commission,
                exclusive_orders=True,
                trade_on_close=True,
                size_second=self.size_second  
                )

The last param size_second is of my own adding to know if we want to use fixed amount or legacy behavior.
Somewhere in the backtesting.py I have placed (among other half a dozen places to prevent asserts):

        # update the size, calculating it from last_price if size_second
        size_orig = size
        size = size / self.last_price if self._size_second else float(size)
  1. backtesting.py used for calculating the signal and compute_stats (I like their stats)

To get my signal next couple of things is done:

  • at the end of the interval last 14 candles are fetched from Binance e.g. OHLCV
  • Backtesting is initialised with this data
  • The next() on the strategy is copied into next_signal() and slighly modified to return 'buy', 'sell', 'none' rather than calling the buy/sell/position.close()
    Like so:
# Snippet from the Strategy (SuperTrend):
    def next_signal(self):
        signal = 'None'
        if crossover(self.data.Close, self.st):
            # self.buy(size=self.position_size)  # Was 0.99
            signal = 'buy'
        elif crossover(self.st, self.data.Close):
            # self.position.close()
            signal = 'sell'
        return signal

# From the main
  strategy=Backtest(df,
                   bot.strategy,
                   cash=self.cash,
                   commission=self.commission,
                   exclusive_orders=True)
# Note that the cash/commission/exclusive_orders are not relevant as I only want strategy signal
# The settings are for the strategy params, example:
# settings = {'atr_timeperiod': 0.09, 'atr_multiplier': 0.55, 'atr_method': True, 'position_size': 100}

  strategy.run(**settings)
  signal = strategy._results._strategy.next_signal()

For the compute_stats it gets a bit more complicated.
We need equity, list of trades and ohlcv data.
So at every candle/trade these are saved externaly so that the following can be executed to get stats:

...
# My own local copy and the imports, had to change __init__.py, added _Broker to allow its import
from common.backtesting import Trade, _Broker
from common.backtesting._stats import compute_stats

 # Note: self._trades is a list()
 if trade_status == 'Bought':
            # Keep track of trading for compute stats
            broker = _Broker(data=self._data, cash=self.cash, commission=2 * self.trade_commission, margin=1,
                             trade_on_close=True, hedging=False, exclusive_orders=True, size_second=True,
                             index=self._data.index)
            self._trade = Trade(broker=broker, size=executed_qty, entry_price=avg_price, entry_bar=self.candle_cnt - 1)
elif trade_status == 'Sold':
    with suppress(AttributeError):
        # Updating the _trade exit_price with avg_price from Market Order from Binance
        self._trade._replace(exit_price=avg_price, exit_bar=self.candle_cnt-1)
        self._trades += self._trade,

# And Finally
       equity = pd.Series(self._data.equity).bfill().fillna(self.cash).values
        stats = compute_stats(
                trades=self._trades,  # broker.closed_trades,
                equity=equity,
                ohlc_data=self._data,
                risk_free_rate=0.0,
                strategy_instance=None  # strategy,
            )
          # Then cleaning the protected element
          stats_clean = {k: v for k, v in stats.items() if not str(k).startswith('_')}

Quite cool, the backtesting.py.

@sojgja
Copy link

sojgja commented Aug 1, 2024

who can integration live trade to Strategy

@kernc kernc marked this as a duplicate of #1225 Feb 23, 2025
@WheatMaThink
Copy link

WheatMaThink commented Apr 4, 2025

Improvements have been made based on the work by @OppOops.

  1. This is a test verification version, so there are many log comments that can be ignored.
  2. The self.data is continuously updated with new data, enabling it to include the latest data up to the current time.
  3. The self._mirror_data is trimmed to only include data for the smallest cycle, and the calculated indicators are assigned to self.strategy.
  4. New data is obtained from the queue.
  5. In the next version, it can be optimized by initializing Backtest with empty data, and then incrementally adding new data from the queue, making it closer to real-time trading.


import threading
import time
import traceback
from collections import deque
from copy import copy

import numpy as np
import pandas as pd

from backtesting import Backtest
from backtesting._stats import compute_stats
from backtesting._util import (
    _Data,
    _indicator_warmup_nbars,
    _strategy_indicators,
    _tqdm,
    try_,
)
from backtesting.backtesting import Strategy, _Broker, _OutOfMoneyError

# 配置日志记录器
logger = logging.getLogger(__name__)
if not logger.handlers:
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)


class _DataPatch(_Data):

    def update_data(self, new_df: pd.DataFrame, last_index=-1) -> int:
        """更新数据
        该方法确保在更新数据时保持原始DataFrame对象的引用不变,
        Args:
            new_df: 新的DataFrame数据
            last_index: 最后一个索引位置,默认为-1(最后一个元素)
        Returns:
            int: 新增的数据量
        """
        # 获取原始DataFrame对象

        df = self.raw_data_df
        old_length = len(df)

        # 验证输入数据
        if not isinstance(new_df.index, pd.DatetimeIndex):
            raise ValueError("新数据的索引必须是pd.DatetimeIndex类型")

        if new_df.empty:
            logger.warning("收到空的K线数据,跳过更新")
            return 0

        # 确保新数据的列与现有数据匹配
        required_columns = set(df.columns)
        if not all(col in new_df.columns for col in required_columns):
            raise ValueError(f"新数据缺少必要的列: {required_columns}")

        last_ts = df.index[last_index]
        # 筛选出大于参考时间戳的新数据
        new_data = new_df[new_df.index > last_ts]

        # 如果没有新数据,直接返回
        if new_data.empty:
            return 0

        # 确保数据按时间升序排序
        new_data = new_data.sort_index()

        # 将新数据添加到原始DataFrame
        for idx in new_data.index:
            # 检查数据类型,确保正确转换
            if any(np.issubdtype(type(df.iloc[0, df.columns.get_loc(col)]), np.floating) for col in df.columns if len(df) > 0):
                # 如果原始数据是浮点类型,确保新数据也是浮点类型
                df.loc[idx] = {k: float(v) for k, v in new_data.loc[idx].items() if k in df.columns}
            else:
                # 否则直接添加
                df.loc[idx] = {k: v for k, v in new_data.loc[idx].items() if k in df.columns}

        # 更新数据后的长度
        new_length = len(df)
        added_bars_len = new_length - old_length

        # 清除缓存并更新数组
        self._update()

        logger.debug(f"数据更新成功: 原长度={old_length}, 新长度={new_length}, 新增={added_bars_len},raw_id={id(self.raw_data_df)} id={id(self.df)}")
        return added_bars_len

    def capture_data(self, last_index=-1) -> pd.DataFrame:
        """
        截取当前数据 self.raw_data_df
        该方法确保在更新数据时保持原始DataFrame对象的引用不变,
        """
        # 验证索引范围

        logger.debug(f"数据截取: raw_id={id(self.raw_data_df)} id={id(self.df)}")

        if last_index < -len(self.raw_data_df) or last_index >= len(self.raw_data_df):
            last_index = -1

        # 使用iloc进行切片以保持DataFrame引用不变
        df = self.raw_data_df.iloc[last_index:]

        # 获取原始DataFrame的引用
        original_df = self.raw_data_df

        # 清空原始DataFrame
        original_df.drop(labels=original_df.index, axis=0, inplace=True)

        # 将截取的数据添加到原始DataFrame
        for idx in df.index:
            original_df.loc[idx] = df.loc[idx]

        # 清除缓存并更新数组
        self._set_length(len(original_df))
        self._update()

        logger.debug(f"数据截取成功: raw_id={id(self.raw_data_df)} id={id(self.df)}")

        return self.raw_data_df

    @property
    def raw_data_df(self) -> pd.DataFrame:
        """获取原始DataFrame数据

        该属性提供对底层原始DataFrame数据的访问。
        它通过调用父类的__df属性来获取数据。

        Returns:
            pd.DataFrame: 原始的DataFrame数据对象
        """
        # 获取父类_Data中的df属性
        return self._Data__df


class LiveTrade:
    _ST_START, _ST_LIVE, _ST_HISTORY, _ST_OVER = range(4)  # 有限状态机的状态

    def __init__(self, backtest: Backtest) -> None:
        self.backtest = backtest

        self.backtest_data: pd.DataFrame = self.backtest._data.copy(deep=False)
        self.backtest_broker: _Broker = self.backtest._broker
        self.backtest_strategy: Strategy = self.backtest._strategy

        self._finalize_trades = bool(self.backtest._finalize_trades)  # 回测结束时,是否计算进回测统计数据

        self.q_live = deque()
        self.semaphore = threading.Semaphore(0)
        self._state = self._ST_START

    def run(self, **kwargs):

        self.data = _DataPatch(self.backtest_data)  # 复制全部的数据 self._data 是pd.dataframe 包装到_Data类中
        self.broker: _Broker = self.backtest_broker(data=self.data)  # 初始化经纪商,可变动的数据data
        self.strategy: Strategy = self.backtest_strategy(self.broker, self.data, kwargs)  # 初始化策略,可变动的数据data

        self._mirror_data = _DataPatch(self.backtest_data.copy(deep=False))
        self._mirror_strategy = self.backtest_strategy(self.broker, self._mirror_data, kwargs)
        self._mirror_strategy.init()

        self._mirror_data._update()  # Strategy.init 可能已更改/添加到 data.df,重新赋值,全部的df数据, self._data:pd.DataFrame

        # 在 Strategy.next() 中使用的指标 strategy实例
        indicator_attrs = _strategy_indicators(self._mirror_strategy)
        self.minimum_period = self._current_idx = 1 + _indicator_warmup_nbars(self._mirror_strategy)  # 最小期数据长度  minimum_period
        self._run_next(start=self._current_idx, end=len(self.data.raw_data_df), indicator_attrs=indicator_attrs)

        self.thread = threading.Thread(target=self._run_thread, kwargs=kwargs, daemon=True)
        self.thread.start()

    def _run_next(self, start: int, end: int, indicator_attrs):

        with np.errstate(invalid="ignore"):
            
            for i in _tqdm(range(start, end), desc=self.run.__qualname__, unit="bar", mininterval=2, miniters=100):
                # 为 `next` 调用准备数据和指标
                self.data._set_length(self._current_idx + 1)  
                self._current_idx = self._current_idx + 1
                # _set_length 设置后,broker的_data 数据长度也会发生变化,为当前长度。
                for attr, indicator in indicator_attrs:  # attr 指标的名称,如ma1 ma2等. indicator 设置指标的最大长度,当前[-1]可以访问最新数据。
                    setattr(self.strategy, attr, indicator[..., : i + 1])  # 在最后一个维度上切片指标(2d 指标的情况)

                # 处理订单处理和经纪人事务
                try:
                    self.broker.next()
                except _OutOfMoneyError:
                    break

                # 下一个 tick,bar 关闭前的一刻
                self.strategy.next()  # 执行 buy or shell

        self.data._set_length(len(self.data.raw_data_df))

    def _run_thread(self, **kwargs):
        logger.info("K线处理线程已启动")  # 添加线程启动日志

        while True:
            try:
                if self._state == self._ST_OVER:
                    logger.info("收到退出信号,K线处理线程结束")  # 添加线程退出日志
                    return False

                try:
                    self.semaphore.acquire()
                    kline = self.q_live.popleft()  # self.qlive.get(timeout=self._qcheck))  queue.Queue
                    # 添加获取数据的日志
                    if isinstance(kline, pd.DataFrame) and not kline.empty:
                        logger.info(f"从队列获取到新的K线数据,时间范围: {kline.index.min()} - {kline.index.max()}, 数据量: {len(kline)}")
                    else:
                        logger.warning(f"从队列获取到异常数据: {type(kline)}")
                except IndexError:
                    logger.debug("队列为空,等待新数据...")  # 添加队列为空的日志
                    time.sleep(1)
                    continue

                # 添加处理数据前的日志
                logger.info(f"开始处理K线数据,数据量: {len(kline) if isinstance(kline, pd.DataFrame) else 'unknown'}")
                self._process_kline(kline)
                logger.info("K线数据处理完成")  # 添加处理完成的日志

            except Exception as e:
                logger.exception(f"run_thread异常: {e}\n{traceback.format_exc()}")  # 增强异常日志

    def _process_kline(self, kline: pd.DataFrame):
        # 一条数据
        logger.info(f"开始处理K线数据,时间范围: {kline.index.min()} - {kline.index.max()}")

        # 更新主数据
        old_len = len(self.data.raw_data_df)
        self.data.update_data(kline)
        new_len = len(self.data.raw_data_df)
        logger.info(f"主数据更新: 原长度={old_len}, 新长度={new_len}, 新增={new_len - old_len}")

        # 更新镜像数据
        old_mirror_len = len(self._mirror_data.raw_data_df)
        added_bars_len = self._mirror_data.update_data(kline)
        new_mirror_len = len(self._mirror_data.raw_data_df)
        logger.info(f"镜像数据更新: 原长度={old_mirror_len}, 新长度={new_mirror_len}, 新增={added_bars_len}")

        # 减少 self._mirror_data 的数据,加快运算。
        self._mirror_data.capture_data(-(len(kline) + self.minimum_period))
        raw_data_df = self._mirror_data.raw_data_df
        logger.info(f"镜像数据截取后长度: {len(raw_data_df)}")

        self._mirror_strategy.init()
        indicator_attrs = _strategy_indicators(self._mirror_strategy)
        logger.info(f"指标重新计算完成,指标数量: {len(indicator_attrs)}")

        self.broker._equity = np.append(self.broker._equity, [self.broker._equity[-1]] * added_bars_len)

        logger.info(f"开始运行策略,数据范围: {self.minimum_period} - {len(raw_data_df)}")
        self._run_next(start=self.minimum_period, end=len(raw_data_df), indicator_attrs=indicator_attrs)
        logger.info("策略运行完成")

    def update_kline(self, kline: pd.DataFrame):
        """
        添加K 数据
        :param kline:
        :return:
        """
        self.q_live.append(kline)
        self.semaphore.release()

    def set_state(self, state):
        self._state = state

    def get_stats(self):

        if self._finalize_trades is True:
            # 关闭任何剩余的未平仓交易,以便它们产生一些统计数据
            for trade in reversed(self.broker.trades):
                trade.close()

            # HACK: 最后一次重新运行经纪人以处理在最后
            #  策略迭代中放置的关闭订单。使用与最后
            #  经纪人迭代中相同的 OHLC 值。
            if self._current_idx < len(self.data.raw_data_df):
                try_(self.broker.next, exception=_OutOfMoneyError)

        equity = pd.Series(self.broker._equity).bfill().fillna(self.broker._cash).values
        results = _results = compute_stats(
            trades=self.broker.closed_trades,
            equity=equity,
            ohlc_data=self.backtest_data,
            risk_free_rate=0.0,
            strategy_instance=self.strategy,
        )
        return results


if __name__ == "__main__":

    import time

    from backtesting.lib import crossover
    from backtesting.test import GOOG, SMA

    # 设置更详细的日志级别
    logger.setLevel(logging.INFO)
    print("开始测试live_trader.py,重点检查run_thread方法是否能获取数据")

    class SmaCross(Strategy):
        def init(self):
            price = self.data.Close

            self.ma1 = self.I(SMA, price, 10)
            self.ma2 = self.I(SMA, price, 20)

        def next(self):
            # 减少输出频率,只在特定条件下打印
            if len(self.data) % 10 == 0:
                print(f"Strategy.next调用: 当前数据长度={len(self.data)}")

            if crossover(self.ma1, self.ma2):
                self.buy()
                print(f"买入信号触发: 时间={self.data.index[-1]}")
            elif crossover(self.ma2, self.ma1):
                self.sell()
                print(f"卖出信号触发: 时间={self.data.index[-1]}")

    # parameter to test
    num_run_pre = 120
    num_run_post = 200
    num_run_last = 800

    df_partial = pd.DataFrame(GOOG.iloc[:num_run_pre], copy=True)

    # use original backtest core class
    print("\n1. 初始化回测实例...")
    bt = Backtest(df_partial, SmaCross, commission=0.002, exclusive_orders=True)
    stats = bt.run()
    print("初始回测结果:")
    print(stats)

    print("\n2. 创建LiveTrade实例并启动...")
    live_trade = LiveTrade(bt)
    live_trade.run()
    print("LiveTrade实例已启动,线程已开始运行")

    # 等待线程启动
    print("等待3秒,确保线程已启动...")
    time.sleep(3)

    print("\n3. 获取当前统计数据...")
    print(live_trade.get_stats())

    print("\n4. 准备新的K线数据并添加到队列...")
    df_partial = pd.DataFrame(GOOG.iloc[num_run_pre:num_run_post], copy=True)
    # 将索引转换为datetime类型,以确保与实时数据格式一致
    df_partial.index = pd.to_datetime(df_partial.index)
    print(f"新K线数据准备完成,数据量: {len(df_partial)}")

    # 添加数据到队列
    live_trade.update_kline(df_partial)
    print("新K线数据已添加到队列 120-200")

    df_partial = pd.DataFrame(GOOG.iloc[num_run_post:num_run_last], copy=True)
    # 将索引转换为datetime类型,以确保与实时数据格式一致
    df_partial.index = pd.to_datetime(df_partial.index)
    print(f"新K线数据准备完成,数据量: {len(df_partial)}")

    # 添加数据到队列
    live_trade.update_kline(df_partial)
    print("新K线数据已添加到队列 200-800")

    # 等待线程处理数据
    print("\n5. 等待10秒,让线程有足够时间处理数据...")
    time.sleep(10)

    # 父进程等待子进程结束
    print("\n6. 等待线程结束并设置状态为OVER...")
    live_trade.thread.join(timeout=10)
    live_trade.set_state(LiveTrade._ST_OVER)

    print("\n7. 获取最终统计数据...")
    final_stats = live_trade.get_stats()
    print(final_stats)

    bt_2 = Backtest(GOOG.iloc[:num_run_last], SmaCross, commission=0.002, exclusive_orders=True)
    stats = bt_2.run()
    print("\n8. 数据比较验证回测结果:")
    print(stats)

    print("\n测试完成,run_thread方法已成功获取和处理数据")




Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
API Needs API-related discussion enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.