Skip to content

Commit 8d1eb81

Browse files
authored
Merge pull request #414 from pikers/agg_feedz
Agg feedz
2 parents 220d38b + 963e5bd commit 8d1eb81

28 files changed

+1321
-753
lines changed

.github/workflows/ci.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,27 @@ on:
1414

1515
jobs:
1616

17+
# test that we can generate a software distribution and install it
18+
# thus avoid missing file issues after packaging.
19+
sdist-linux:
20+
name: 'sdist'
21+
runs-on: ubuntu-latest
22+
23+
steps:
24+
- name: Checkout
25+
uses: actions/checkout@v3
26+
27+
- name: Setup python
28+
uses: actions/setup-python@v2
29+
with:
30+
python-version: '3.10'
31+
32+
- name: Build sdist
33+
run: python setup.py sdist --formats=zip
34+
35+
- name: Install sdist from .zips
36+
run: python -m pip install dist/*.zip
37+
1738
testing:
1839
name: 'install + test-suite'
1940
runs-on: ubuntu-latest

dockering/ib/docker-compose.yml

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,39 +62,39 @@ services:
6262
# - "127.0.0.1:4002:4002"
6363
# - "127.0.0.1:5900:5900"
6464

65-
ib_gw_live:
66-
image: waytrade/ib-gateway:1012.2i
67-
restart: always
68-
network_mode: 'host'
65+
# ib_gw_live:
66+
# image: waytrade/ib-gateway:1012.2i
67+
# restart: always
68+
# network_mode: 'host'
6969

70-
volumes:
71-
- type: bind
72-
source: ./jts_live.ini
73-
target: /root/jts/jts.ini
74-
# don't let ibc clobber this file for
75-
# the main reason of not having a stupid
76-
# timezone set..
77-
read_only: true
70+
# volumes:
71+
# - type: bind
72+
# source: ./jts_live.ini
73+
# target: /root/jts/jts.ini
74+
# # don't let ibc clobber this file for
75+
# # the main reason of not having a stupid
76+
# # timezone set..
77+
# read_only: true
7878

79-
# force our own ibc config
80-
- type: bind
81-
source: ./ibc.ini
82-
target: /root/ibc/config.ini
79+
# # force our own ibc config
80+
# - type: bind
81+
# source: ./ibc.ini
82+
# target: /root/ibc/config.ini
8383

84-
# force our noop script - socat isn't needed in host mode.
85-
- type: bind
86-
source: ./fork_ports_delayed.sh
87-
target: /root/scripts/fork_ports_delayed.sh
84+
# # force our noop script - socat isn't needed in host mode.
85+
# - type: bind
86+
# source: ./fork_ports_delayed.sh
87+
# target: /root/scripts/fork_ports_delayed.sh
8888

89-
# force our noop script - socat isn't needed in host mode.
90-
- type: bind
91-
source: ./run_x11_vnc.sh
92-
target: /root/scripts/run_x11_vnc.sh
93-
read_only: true
89+
# # force our noop script - socat isn't needed in host mode.
90+
# - type: bind
91+
# source: ./run_x11_vnc.sh
92+
# target: /root/scripts/run_x11_vnc.sh
93+
# read_only: true
9494

95-
# NOTE: to fill these out, define an `.env` file in the same dir as
96-
# this compose file which looks something like:
97-
environment:
98-
TRADING_MODE: 'live'
99-
VNC_SERVER_PASSWORD: 'doggy'
100-
VNC_SERVER_PORT: '3004'
95+
# # NOTE: to fill these out, define an `.env` file in the same dir as
96+
# # this compose file which looks something like:
97+
# environment:
98+
# TRADING_MODE: 'live'
99+
# VNC_SERVER_PASSWORD: 'doggy'
100+
# VNC_SERVER_PORT: '3004'

piker/_daemon.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,17 @@
3535

3636
_root_dname = 'pikerd'
3737

38-
_registry_host: str = '127.0.0.1'
39-
_registry_port: int = 6116
40-
_registry_addr = (
41-
_registry_host,
42-
_registry_port,
38+
_default_registry_host: str = '127.0.0.1'
39+
_default_registry_port: int = 6116
40+
_default_reg_addr: tuple[str, int] = (
41+
_default_registry_host,
42+
_default_registry_port,
4343
)
44+
45+
# NOTE: this value is set as an actor-global once the first endpoint
46+
# who is capable, spawns a `pikerd` service tree.
47+
_registry_addr: tuple[str, int] | None = None
48+
4449
_tractor_kwargs: dict[str, Any] = {
4550
# use a different registry addr then tractor's default
4651
'arbiter_addr': _registry_addr
@@ -152,13 +157,20 @@ async def open_pikerd(
152157
153158
'''
154159
global _services
160+
global _registry_addr
161+
162+
if (
163+
_registry_addr is None
164+
or registry_addr
165+
):
166+
_registry_addr = registry_addr or _default_reg_addr
155167

156168
# XXX: this may open a root actor as well
157169
async with (
158170
tractor.open_root_actor(
159171

160172
# passed through to ``open_root_actor``
161-
arbiter_addr=registry_addr or _registry_addr,
173+
arbiter_addr=_registry_addr,
162174
name=_root_dname,
163175
loglevel=loglevel,
164176
debug_mode=debug_mode,
@@ -197,7 +209,7 @@ async def open_piker_runtime(
197209
# XXX: you should pretty much never want debug mode
198210
# for data daemons when running in production.
199211
debug_mode: bool = False,
200-
registry_addr: None | tuple[str, int] = _registry_addr,
212+
registry_addr: None | tuple[str, int] = None,
201213

202214
) -> tractor.Actor:
203215
'''
@@ -206,13 +218,20 @@ async def open_piker_runtime(
206218
207219
'''
208220
global _services
221+
global _registry_addr
222+
223+
if (
224+
_registry_addr is None
225+
or registry_addr
226+
):
227+
_registry_addr = registry_addr or _default_reg_addr
209228

210229
# XXX: this may open a root actor as well
211230
async with (
212231
tractor.open_root_actor(
213232

214233
# passed through to ``open_root_actor``
215-
arbiter_addr=registry_addr,
234+
arbiter_addr=_registry_addr,
216235
name=name,
217236
loglevel=loglevel,
218237
debug_mode=debug_mode,

piker/brokers/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,21 @@
2626

2727
__brokers__ = [
2828
'binance',
29-
'questrade',
30-
'robinhood',
3129
'ib',
3230
'kraken',
31+
32+
# broken but used to work
33+
# 'questrade',
34+
# 'robinhood',
35+
36+
# TODO: we should get on these stat!
37+
# alpaca
38+
# wstrade
39+
# iex
40+
41+
# deribit
42+
# kucoin
43+
# bitso
3344
]
3445

3546

piker/brokers/binance.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,15 @@
4141
SymbolNotFound,
4242
DataUnavailable,
4343
)
44-
from ..log import get_logger, get_console_log
45-
from ..data import ShmArray
44+
from ..log import (
45+
get_logger,
46+
get_console_log,
47+
)
4648
from ..data.types import Struct
47-
from ..data._web_bs import open_autorecon_ws, NoBsWs
49+
from ..data._web_bs import (
50+
open_autorecon_ws,
51+
NoBsWs,
52+
)
4853

4954
log = get_logger(__name__)
5055

@@ -142,7 +147,9 @@ class OHLC(Struct):
142147

143148

144149
# convert datetime obj timestamp to unixtime in milliseconds
145-
def binance_timestamp(when):
150+
def binance_timestamp(
151+
when: datetime
152+
) -> int:
146153
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
147154

148155

@@ -181,7 +188,7 @@ async def symbol_info(
181188
params = {}
182189

183190
if sym is not None:
184-
sym = sym.upper()
191+
sym = sym.lower()
185192
params = {'symbol': sym}
186193

187194
resp = await self._api(
@@ -238,7 +245,7 @@ async def bars(
238245
) -> dict:
239246

240247
if end_dt is None:
241-
end_dt = pendulum.now('UTC')
248+
end_dt = pendulum.now('UTC').add(minutes=1)
242249

243250
if start_dt is None:
244251
start_dt = end_dt.start_of(
@@ -396,8 +403,8 @@ async def open_history_client(
396403

397404
async def get_ohlc(
398405
timeframe: float,
399-
end_dt: Optional[datetime] = None,
400-
start_dt: Optional[datetime] = None,
406+
end_dt: datetime | None = None,
407+
start_dt: datetime | None = None,
401408

402409
) -> tuple[
403410
np.ndarray,
@@ -412,25 +419,20 @@ async def get_ohlc(
412419
start_dt=start_dt,
413420
end_dt=end_dt,
414421
)
415-
start_dt = pendulum.from_timestamp(array[0]['time'])
416-
end_dt = pendulum.from_timestamp(array[-1]['time'])
417-
return array, start_dt, end_dt
422+
times = array['time']
423+
if (
424+
end_dt is None
425+
):
426+
inow = round(time.time())
427+
if (inow - times[-1]) > 60:
428+
await tractor.breakpoint()
418429

419-
yield get_ohlc, {'erlangs': 3, 'rate': 3}
430+
start_dt = pendulum.from_timestamp(times[0])
431+
end_dt = pendulum.from_timestamp(times[-1])
420432

433+
return array, start_dt, end_dt
421434

422-
async def backfill_bars(
423-
sym: str,
424-
shm: ShmArray, # type: ignore # noqa
425-
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
426-
) -> None:
427-
"""Fill historical bars into shared mem / storage afap.
428-
"""
429-
with trio.CancelScope() as cs:
430-
async with open_cached_client('binance') as client:
431-
bars = await client.bars(symbol=sym)
432-
shm.push(bars)
433-
task_status.started(cs)
435+
yield get_ohlc, {'erlangs': 3, 'rate': 3}
434436

435437

436438
async def stream_quotes(
@@ -465,7 +467,7 @@ async def stream_quotes(
465467
si = sym_infos[sym] = syminfo.to_dict()
466468
filters = {}
467469
for entry in syminfo.filters:
468-
ftype = entry.pop('filterType')
470+
ftype = entry['filterType']
469471
filters[ftype] = entry
470472

471473
# XXX: after manually inspecting the response format we

piker/brokers/ib/broker.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,8 @@ async def update_and_audit_msgs(
371371
else:
372372
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
373373

374-
raise ValueError(
375-
# log.error(
374+
# raise ValueError(
375+
log.error(
376376
f'POSITION MISMATCH ib <-> piker ledger:\n'
377377
f'ib: {ibppmsg}\n'
378378
f'piker: {msg}\n'
@@ -575,17 +575,18 @@ async def trades_dialogue(
575575
# if new trades are detected from the API, prepare
576576
# them for the ledger file and update the pptable.
577577
if api_to_ledger_entries:
578-
trade_entries = api_to_ledger_entries[acctid]
578+
trade_entries = api_to_ledger_entries.get(acctid)
579579

580-
# write ledger with all new trades **AFTER**
581-
# we've updated the `pps.toml` from the
582-
# original ledger state! (i.e. this is
583-
# currently done on exit)
584-
ledger.update(trade_entries)
580+
if trade_entries:
581+
# write ledger with all new trades **AFTER**
582+
# we've updated the `pps.toml` from the
583+
# original ledger state! (i.e. this is
584+
# currently done on exit)
585+
ledger.update(trade_entries)
585586

586-
trans = trans_by_acct.get(acctid)
587-
if trans:
588-
table.update_from_trans(trans)
587+
trans = trans_by_acct.get(acctid)
588+
if trans:
589+
table.update_from_trans(trans)
589590

590591
# XXX: not sure exactly why it wouldn't be in
591592
# the updated output (maybe this is a bug?) but
@@ -883,7 +884,7 @@ async def deliver_trade_events(
883884
# execdict.pop('acctNumber')
884885

885886
fill_msg = BrokerdFill(
886-
# should match the value returned from
887+
# NOTE: should match the value returned from
887888
# `.submit_limit()`
888889
reqid=execu.orderId,
889890
time_ns=time.time_ns(), # cuz why not

piker/brokers/ib/feed.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,13 @@ async def open_symbol_search(
10471047
stock_results = []
10481048

10491049
async def stash_results(target: Awaitable[list]):
1050-
stock_results.extend(await target)
1050+
try:
1051+
results = await target
1052+
except tractor.trionics.Lagged:
1053+
print("IB SYM-SEARCH OVERRUN?!?")
1054+
return
1055+
1056+
stock_results.extend(results)
10511057

10521058
for i in range(10):
10531059
with trio.move_on_after(3) as cs:

0 commit comments

Comments
 (0)