forked from namuan/trading-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtele_spy_trade_bot.py
86 lines (68 loc) · 2.44 KB
/
tele_spy_trade_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
import time
from argparse import ArgumentParser
from datetime import datetime
import schedule
from common.analyst import fetch_data_on_demand
from common.external_charts import build_chart_link
from common.plotting import plot_intraday
from common.tele_notifier import send_message_to_telegram, send_file_to_telegram
from common.trading_hours import after_hour_during_trading_day
def parse_args():
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"-s", "--schedule", type=int, default=1, help="Schedule timer in hours"
)
parser.add_argument(
"-r", "--run-once", action="store_true", default=False, help="Run once"
)
return parser.parse_args()
def compile_report(spy_data):
heading = f"""
*Last Close* ({'%0.2f' % spy_data["last_close"]})
RSI2 ({'%0.2f' % spy_data['rsi_2']}), RSI4 ({'%0.2f' % spy_data['rsi_4']}), RSI9 ({'%0.2f' % spy_data['rsi_9']}), RSI14 ({'%0.2f' % spy_data['rsi_14']})
"""
reports = [
"------------ *{}* ------------".format(
datetime.now().strftime("%a %d-%b-%Y")
),
heading,
]
return " ".join(reports)
def send_to_telegram(output_vol_plt, chart_link, report):
send_message_to_telegram(report)
send_file_to_telegram("Vol chart", output_vol_plt)
send_message_to_telegram(chart_link, format="HTML", disable_web_preview=False)
def run_analysis(telegram=True, output_dir="output"):
ticker = "SPY"
plt_output_file = "{}/{}-intraday-vol.png".format(output_dir, ticker)
spx_plt = plot_intraday(ticker, period="2d")
spx_plt.savefig(plt_output_file)
spx_plt.close()
spy_data, _ = fetch_data_on_demand(ticker)
report = compile_report(spy_data)
chart_link = build_chart_link(ticker)
if telegram:
send_to_telegram(plt_output_file, chart_link, report)
else:
print(report)
def run_bot():
if after_hour_during_trading_day(3):
run_analysis()
def check_if_run(schedule_in_hours):
print("Running {}".format(datetime.now()))
schedule.every(schedule_in_hours).hour.do(run_bot)
while True:
schedule.run_pending()
time.sleep(schedule_in_hours * 60 * 60 / 2)
def main():
args = parse_args()
schedule = args.schedule
run_once = args.run_once
if run_once:
logging.info(">> Running once")
run_analysis(telegram=False)
else:
check_if_run(schedule)
if __name__ == "__main__":
main()