This repository was archived by the owner on Nov 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 147
/
Copy pathBybitWebsocket.py
269 lines (219 loc) · 9.39 KB
/
BybitWebsocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import ssl
import websocket
import threading
import traceback
from time import sleep
import json
import logging
import urllib
import math
import time
import hmac
# This is a simple adapters for connecting to Bybit's websocket API.
# You could use methods whose names begin with “subscribe”, and get result by "get_data" method.
# All the provided websocket APIs are implemented, includes public and private topic.
# Private methods are only available after authorization, but public methods do not.
# If you have any questions during use, please contact us vim the e-mail "[email protected]".
class BybitWebsocket:
# User can ues MAX_DATA_CAPACITY to control memory usage.
MAX_DATA_CAPACITY = 200
# funding, account, leverage
PRIVATE_TOPIC = ['position', 'execution', 'order', 'stop_order', 'wallet']
USD_SYMBOLS = ['BTCUSD', 'ETHUSD', 'EOSUSD', 'XRPUSD']
WS_OPS = ['auth', 'subscribe']
def __init__(self, wsURL, api_key, api_secret):
'''Initialize'''
self.logger = logging.getLogger(__name__)
self.logger.debug("Initializing WebSocket.")
if api_key is not None and api_secret is None:
raise ValueError('api_secret is required if api_key is provided')
if api_key is None and api_secret is not None:
raise ValueError('api_key is required if api_secret is provided')
self.api_key = api_key
self.api_secret = api_secret
self.data = {}
self.exited = False
self.auth = False
# We can subscribe right in the connection querystring, so let's build that.
# Subscribe to all pertinent endpoints
self.logger.info("Connecting to %s" % wsURL)
self.__connect(wsURL)
def exit(self):
'''Call this to exit - will close websocket.'''
self.exited = True
self.ws.close()
def __connect(self, wsURL):
'''Connect to the websocket in a thread.'''
self.logger.debug("Starting thread")
self.ws = websocket.WebSocketApp(wsURL,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
keep_running=True)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
self.logger.debug("Started thread")
# Wait for connect before continuing
retry_times = 5
while not self.ws.sock or not self.ws.sock.connected and retry_times:
sleep(1)
retry_times -= 1
if retry_times == 0 and not self.ws.sock.connected:
self.logger.error("Couldn't connect to WebSocket! Exiting.")
self.exit()
raise websocket.WebSocketTimeoutException('Error!Couldn not connect to WebSocket!.')
if self.api_key and self.api_secret:
self.__do_auth()
def generate_signature(self, expires):
"""Generate a request signature."""
_val = 'GET/realtime' + expires
return str(hmac.new(bytes(self.api_secret, "utf-8"), bytes(_val, "utf-8"), digestmod="sha256").hexdigest())
def __do_auth(self):
expires = str(int(round(time.time()) + 1)) + "000"
signature = self.generate_signature(expires)
auth = {}
auth["op"] = "auth"
auth["args"] = [self.api_key, expires, signature]
args = json.dumps(auth)
self.ws.send(args)
def __on_message(self, message):
'''Handler for parsing WS messages.'''
message = json.loads(message)
if 'success' in message and message["success"]:
if 'request' in message and message["request"]["op"] == 'auth':
self.auth = True
self.logger.info("Authentication success.")
if 'ret_msg' in message and message["ret_msg"] == 'pong':
self.data["pong"].append("PING success")
if 'topic' in message:
self.data[message["topic"]].append(message)
if len(self.data[message["topic"]]) > BybitWebsocket.MAX_DATA_CAPACITY:
self.data[message["topic"]] = self.data[message["topic"]][BybitWebsocket.MAX_DATA_CAPACITY // 2:]
def __on_error(self, error):
'''Called on fatal websocket errors. We exit on these.'''
if not self.exited:
self.logger.error("Error : %s" % error)
raise websocket.WebSocketException(error)
def __on_open(self):
'''Called when the WS opens.'''
self.logger.debug("Websocket Opened.")
def __on_close(self):
'''Called on websocket close.'''
self.logger.info('Websocket Closed')
def ping(self):
self.ws.send('{"op":"ping"}')
if 'pong' not in self.data:
self.data['pong'] = []
def subscribe_kline(self, symbol: str, interval: str):
param = {}
param['op'] = 'subscribe'
if self.is_inverse(symbol):
topic_name = 'klineV2.' + interval + '.' + symbol
else:
topic_name = 'candle.' + interval + '.' + symbol
param['args'] = [topic_name]
self.ws.send(json.dumps(param))
if topic_name not in self.data:
self.data[topic_name] = []
def subscribe_trade(self, symbol: str):
topic_name = 'trade.' + symbol
param = {'op': 'subscribe', 'args': [topic_name]}
self.ws.send(json.dumps(param))
if topic_name not in self.data:
self.data[topic_name] = []
def subscribe_insurance(self):
self.ws.send('{"op":"subscribe","args":["insurance"]}')
if 'insurance.BTC' not in self.data:
self.data['insurance.BTC'] = []
self.data['insurance.XRP'] = []
self.data['insurance.EOS'] = []
self.data['insurance.ETH'] = []
def subscribe_orderBookL2(self, symbol, level=None):
param = {}
param['op'] = 'subscribe'
if level is None:
topic = 'orderBookL2_25.' + symbol
else:
topic = 'orderBook_{level}.100ms.{symbol}'.format(level=level, symbol=symbol)
print(topic)
param['args'] = [topic]
self.ws.send(json.dumps(param))
if topic not in self.data:
self.data[topic] = []
def subscribe_instrument_info(self, symbol):
param = {}
param['op'] = 'subscribe'
param['args'] = ['instrument_info.100ms.' + symbol]
self.ws.send(json.dumps(param))
if 'instrument_info.100ms.' + symbol not in self.data:
self.data['instrument_info.100ms.' + symbol] = []
def subscribe_position(self):
self.ws.send('{"op":"subscribe","args":["position"]}')
if 'position' not in self.data:
self.data['position'] = []
def subscribe_execution(self):
self.ws.send('{"op":"subscribe","args":["execution"]}')
if 'execution' not in self.data:
self.data['execution'] = []
def subscribe_order(self):
self.ws.send('{"op":"subscribe","args":["order"]}')
if 'order' not in self.data:
self.data['order'] = []
def subscribe_stop_order(self):
self.ws.send('{"op":"subscribe","args":["stop_order"]}')
if 'wallet' not in self.data:
self.data['stop_order'] = []
def subscribe_wallet(self):
self.ws.send('{"op":"subscribe","args":["wallet"]}')
if 'wallet' not in self.data:
self.data['wallet'] = []
def get_kline(self, symbol, interval):
if self.is_inverse(symbol):
topic_name = 'klineV2.' + interval + '.' + symbol
else:
topic_name = 'candle.' + interval + '.' + symbol
if topic_name in self.data and len(self.data[topic_name]) > 0:
return self.data[topic_name].pop(0)
else:
return []
def get_orderBookL2(self, symbol, level=None):
if level is None:
return self.get_data("orderBookL2_25." + symbol)
else:
return self.get_data("orderBook_200.100ms." + symbol)
def get_stop_order(self):
return self.get_data("stop_order")
def get_order(self):
return self.get_data('order')
def get_execution(self):
return self.get_data('execution')
def get_position(self):
return self.get_data('position')
def get_wallet(self):
return self.get_data('wallet')
def get_trade(self, symbol):
return self.get_data('trade' + '.' + symbol)
def get_instrument_info(self, symbol):
return self.get_data("instrument_info.100ms." + symbol)
def get_insurance(self, coin):
return self.get_data("insurance." + coin)
def get_data(self, topic):
if topic not in self.data:
self.logger.info(" The topic %s is not subscribed." % topic)
return []
if topic.split('.')[0] in BybitWebsocket.PRIVATE_TOPIC and not self.auth and 'request' in self.data \
and self.data['request']['op'] not in BybitWebsocket.WS_OPS:
self.logger.info("Authentication failed. Please check your api_key and api_secret. Topic: %s" % topic)
return []
else:
if len(self.data[topic]) == 0:
return []
return self.data[topic].pop(0)
@staticmethod
def is_inverse(symbol):
if symbol[-1] != 'T':
return True
else:
return False