forked from robcarver17/pysystemtrade
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathib_Fx_prices_data.py
120 lines (89 loc) · 3.84 KB
/
ib_Fx_prices_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import pandas as pd
from sysbrokers.IB.client.ib_fx_client import ibFxClient
from sysbrokers.IB.config.ib_fx_config import (
get_ib_config_from_file,
config_info_for_code,
get_list_of_codes,
ibFXConfig,
)
from sysbrokers.broker_fx_prices_data import brokerFxPricesData
from syscore.exceptions import missingData
from sysdata.data_blob import dataBlob
from sysobjects.spot_fx_prices import fxPrices
from syslogdiag.log_to_screen import logtoscreen
from syslogdiag.pst_logger import CURRENCY_CODE_LOG_LABEL
from syscore.constants import missing_instrument
class ibFxPricesData(brokerFxPricesData):
def __init__(self, ibconnection, data: dataBlob, log=logtoscreen("ibFxPricesData")):
super().__init__(log=log, data=data)
self._ibconnection = ibconnection
self._dataBlob = data
def __repr__(self):
return "IB FX price data"
@property
def ibconnection(self):
return self._ibconnection
@property
def ib_client(self) -> ibFxClient:
client = getattr(self, "_ib_client", None)
if client is None:
client = self._ib_client = ibFxClient(
ibconnection=self.ibconnection, log=self.log
)
return client
def get_list_of_fxcodes(self) -> list:
config_data = self._get_ib_fx_config()
list_of_codes = get_list_of_codes(log=list, config_data=config_data)
return list_of_codes
def _get_fx_prices_without_checking(self, currency_code: str) -> fxPrices:
ib_config_for_code = self._get_config_info_for_code(currency_code)
if ib_config_for_code is missing_instrument:
log = self.log.setup(**{CURRENCY_CODE_LOG_LABEL: currency_code})
log.warn("Can't get prices as missing IB config for %s" % currency_code)
return fxPrices.create_empty()
data = self._get_fx_prices_with_ib_config(currency_code, ib_config_for_code)
return data
def _get_fx_prices_with_ib_config(
self, currency_code: str, ib_config_for_code: ibFXConfig
) -> fxPrices:
raw_fx_prices_as_series = self._get_raw_fx_prices(ib_config_for_code)
log = self.log.setup(**{CURRENCY_CODE_LOG_LABEL: currency_code})
if len(raw_fx_prices_as_series) == 0:
log.warn(
"No available IB prices for %s %s"
% (currency_code, str(ib_config_for_code))
)
return fxPrices.create_empty()
if ib_config_for_code.invert:
raw_fx_prices = 1.0 / raw_fx_prices_as_series
else:
raw_fx_prices = raw_fx_prices_as_series
# turn into a fxPrices
fx_prices = fxPrices(raw_fx_prices)
log.msg("Downloaded %d prices" % len(fx_prices))
return fx_prices
def _get_raw_fx_prices(self, ib_config_for_code: ibFXConfig) -> pd.Series:
ccy1 = ib_config_for_code.ccy1
ccy2 = ib_config_for_code.ccy2
try:
raw_fx_prices = self.ib_client.broker_get_daily_fx_data(ccy1, ccy2)
except missingData:
return pd.Series()
raw_fx_prices_as_series = raw_fx_prices["FINAL"]
return raw_fx_prices_as_series
def _get_config_info_for_code(self, currency_code: str) -> ibFXConfig:
config_data = self._get_ib_fx_config()
ib_config_for_code = config_info_for_code(
config_data=config_data, log=self.log, currency_code=currency_code
)
return ib_config_for_code
# Configuration read in and cache
def _get_ib_fx_config(self) -> pd.DataFrame:
config = getattr(self, "_config", None)
if config is None:
config = self._get_and_set_ib_config_from_file()
return config
def _get_and_set_ib_config_from_file(self) -> pd.DataFrame:
config_data = get_ib_config_from_file(log=self.log)
self._config = config_data
return config_data