From e2ca93e7af6b9c2ac9002ca2041e6a0da8b8a27f Mon Sep 17 00:00:00 2001 From: Krish Suchak <42231639+suchak1@users.noreply.github.com> Date: Mon, 6 Jan 2025 04:16:16 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=A6=99=20feat(data):=20alpaca=20?= =?UTF-8?q?=F0=9F=A6=99=20(#241)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add alpaca get_ohlc fx * add test * update hist script * update script and workflow #minor * use tokens in run * fix feed * try parenthesis * provider dir * test fixed --- .github/workflows/ohlc.yml | 2 + .github/workflows/sandbox.yml | 2 + hyperdrive/Constants.py | 6 ++ hyperdrive/DataSource.py | 132 ++++++++++++++++++++++++++++------ scripts/update_hist_ohlc.py | 39 +++++++--- scripts/update_ohlc.py | 39 +++++++--- test/test_DataSource.py | 53 ++++++++------ 7 files changed, 212 insertions(+), 61 deletions(-) diff --git a/.github/workflows/ohlc.yml b/.github/workflows/ohlc.yml index 481c63d..438c352 100755 --- a/.github/workflows/ohlc.yml +++ b/.github/workflows/ohlc.yml @@ -44,4 +44,6 @@ jobs: AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} S3_BUCKET: ${{ secrets.S3_BUCKET }} POLYGON: ${{ secrets.POLYGON }} + ALPACA: ${{ secrets.ALPACA }} + ALPACA_SECRET: ${{ secrets.ALPACA_SECRET }} run: python scripts/update_ohlc.py diff --git a/.github/workflows/sandbox.yml b/.github/workflows/sandbox.yml index 53066b3..521c970 100755 --- a/.github/workflows/sandbox.yml +++ b/.github/workflows/sandbox.yml @@ -27,6 +27,8 @@ env: PREF_EXCHANGE: ${{ secrets.PREF_EXCHANGE }} TEST: true GLASSNODE_PASS: ${{ secrets.GLASSNODE_PASS }} + ALPACA_PAPER: ${{ secrets.ALPACA_PAPER }} + ALPACA_PAPER_SECRET: ${{ secrets.ALPACA_PAPER_SECRET}} jobs: build: diff --git a/hyperdrive/Constants.py b/hyperdrive/Constants.py index 6da53cd..bb66f71 100755 --- a/hyperdrive/Constants.py +++ b/hyperdrive/Constants.py @@ -34,11 +34,13 @@ def get_env_bool(var_name): IDX_DIR = 'indices' # providers POLY_DIR = 'polygon' +ALPACA_DIR = 'alpaca' # models MODELS_DIR = 'models' folders = { 'polygon': POLY_DIR, + 'alpaca': ALPACA_DIR } # Column Names @@ -109,6 +111,8 @@ def get_env_bool(var_name): 'X%3ALTCUSD', 'X%3AXMRUSD', 'X%3AIOTUSD' ] +ALPC_CRYPTO_SYMBOLS = ['BTC/USD', 'ETH/USD', 'LTC/USD'] + SENTIMENT_SYMBOLS_IGNORE = { 'SPYD', 'VWDRY', 'BPMP', 'FOX', 'YYY', 'SDIV', @@ -124,6 +128,8 @@ def get_env_bool(var_name): FEW_DAYS = str(FEW) + 'd' SCRIPT_FAILURE_THRESHOLD = 0.95 +ALPACA_FREE_DELAY = 0.5 + # Exchanges BINANCE = 'BINANCE' KRAKEN = 'KRAKEN' diff --git a/hyperdrive/DataSource.py b/hyperdrive/DataSource.py index c1dbaf9..575a634 100755 --- a/hyperdrive/DataSource.py +++ b/hyperdrive/DataSource.py @@ -434,6 +434,16 @@ def save_ndx(self, **kwargs): if os.path.exists(filename): return filename + def log_api_call_time(self): + self.last_api_call_time = time() + + def obey_free_limit(self, free_delay): + if self.free and hasattr(self, 'last_api_call_time'): + time_since_last_call = time() - self.last_api_call_time + delay = free_delay - time_since_last_call + if delay > 0: + sleep(delay) + class Indices(MarketData): def __init__(self): @@ -448,6 +458,102 @@ def get_ndx(self, date=datetime.now()): return self.standardize_ndx(df) +class Alpaca(MarketData): + # AlpacaData + def __init__( + self, + token=os.environ.get('ALPACA'), + secret=os.environ.get('ALPACA_SECRET'), + free=True, + paper=False + ): + super().__init__() + self.base = 'https://data.alpaca.markets' + self.token = os.environ.get( + 'ALPACA_PAPER') if paper or C.TEST else token + self.secret = os.environ.get( + 'ALPACA_PAPER_SECRET') if paper or C.TEST else secret + if not (self.token and self.secret): + raise Exception('missing Alpaca credentials') + self.provider = 'alpaca' + self.free = free + + # def get_dividends(self, **kwargs): + # pass + # def get_splits(self, **kwargs): + # pass + + def get_ohlc(self, **kwargs): + def _get_ohlc(symbol, timeframe='max'): + is_crypto = symbol in C.ALPC_CRYPTO_SYMBOLS + version = 'v1beta3' if is_crypto else 'v2' + page_token = None + start, _ = self.traveller.convert_dates(timeframe) + parts = [ + self.base, + version, + 'crypto/us' if is_crypto else 'stocks', + 'bars', + ] + url = '/'.join(parts) + pre_params = { + 'symbols': symbol, + 'timeframe': '1D', + 'start': start, + 'limit': 10000, + } | ({} if is_crypto else {'adjustment': 'all', 'feed': 'iex'}) + headers = { + 'APCA-API-KEY-ID': self.token, + 'APCA-API-SECRET-KEY': self.secret + } + results = [] + while True: + self.obey_free_limit(C.ALPACA_FREE_DELAY) + try: + post_params = { + 'page_token': page_token} if page_token else {} + params = pre_params | post_params + response = requests.get(url, params, headers=headers) + if not response.ok: + raise Exception( + 'Invalid response from Alpaca for OHLC', + response.status_code, + response.text + ) + data = response.json() + if data.get('bars') and data['bars'].get(symbol): + results += data['bars'][symbol] + finally: + self.log_api_call_time() + if data.get('next_page_token'): + page_token = data['next_page_token'] + else: + break + df = pd.DataFrame(results) + columns = { + 't': 'date', + 'o': 'open', + 'h': 'high', + 'l': 'low', + 'c': 'close', + 'v': 'volume', + 'vw': 'average', + 'n': 'trades' + } + df = df.rename(columns=columns) + df['date'] = pd.to_datetime(df['date']).dt.tz_convert( + C.TZ).dt.tz_localize(None) + df = self.standardize_ohlc(symbol, df) + return self.reader.data_in_timeframe(df, C.TIME, timeframe) + return self.try_again(func=_get_ohlc, **kwargs) + + # def get_intraday(self, **kwargs): + # pass + + # def get_news(self, **kwargs): + # pass + + class Polygon(MarketData): def __init__(self, token=os.environ.get('POLYGON'), free=True): super().__init__() @@ -465,19 +571,9 @@ def paginate(self, gen, apply): results.append(apply(item)) return results - def obey_free_limit(self): - if self.free and hasattr(self, 'last_api_call_time'): - time_since_last_call = time() - self.last_api_call_time - delay = C.POLY_FREE_DELAY - time_since_last_call - if delay > 0: - sleep(delay) - - def log_api_call_time(self): - self.last_api_call_time = time() - def get_dividends(self, **kwargs): def _get_dividends(symbol, timeframe='max'): - self.obey_free_limit() + self.obey_free_limit(C.POLY_FREE_DELAY) try: start, _ = self.traveller.convert_dates(timeframe) response = self.paginate( @@ -495,8 +591,6 @@ def _get_dividends(symbol, timeframe='max'): 'amount': div.cash_amount } ) - except Exception as e: - raise e finally: self.log_api_call_time() raw = pd.DataFrame(response) @@ -506,7 +600,7 @@ def _get_dividends(symbol, timeframe='max'): def get_splits(self, **kwargs): def _get_splits(symbol, timeframe='max'): - self.obey_free_limit() + self.obey_free_limit(C.POLY_FREE_DELAY) try: start, _ = self.traveller.convert_dates(timeframe) response = self.paginate( @@ -522,8 +616,6 @@ def _get_splits(symbol, timeframe='max'): 'ratio': split.split_from / split.split_to } ) - except Exception as e: - raise e finally: self.log_api_call_time() raw = pd.DataFrame(response) @@ -536,14 +628,12 @@ def _get_ohlc(symbol, timeframe='max'): is_crypto = symbol.find('X%3A') == 0 formatted_start, formatted_end = self.traveller.convert_dates( timeframe) - self.obey_free_limit() + self.obey_free_limit(C.POLY_FREE_DELAY) try: response = self.client.get_aggs( symbol, 1, 'day', from_=formatted_start, to=formatted_end, adjusted=True, limit=C.POLY_MAX_AGGS_LIMIT ) - except Exception as e: - raise e finally: self.log_api_call_time() @@ -573,7 +663,7 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True): raise Exception(f'No dates in timeframe: {timeframe}.') for _, date in enumerate(dates): - self.obey_free_limit() + self.obey_free_limit(C.POLY_FREE_DELAY) try: response = self.client.get_aggs( symbol, min, 'minute', from_=date, to=date, @@ -582,8 +672,6 @@ def _get_intraday(symbol, min=1, timeframe='max', extra_hrs=True): except exceptions.NoResultsError: # This is to prevent breaking the loop over weekends continue - except Exception as e: - raise e finally: self.log_api_call_time() diff --git a/scripts/update_hist_ohlc.py b/scripts/update_hist_ohlc.py index 9c2825f..727388e 100755 --- a/scripts/update_hist_ohlc.py +++ b/scripts/update_hist_ohlc.py @@ -1,20 +1,22 @@ import os -import sys from multiprocessing import Process -sys.path.append('hyperdrive') -from DataSource import Polygon # noqa autopep8 -from Constants import CI, PathFinder, POLY_CRYPTO_SYMBOLS # noqa autopep8 - +from hyperdrive.DataSource import Polygon, Alpaca +from hyperdrive.Constants import PathFinder +import hyperdrive.Constants as C +alpc = Alpaca(paper=C.TEST) poly = Polygon(os.environ['POLYGON']) stock_symbols = poly.get_symbols() -crypto_symbols = POLY_CRYPTO_SYMBOLS -all_symbols = stock_symbols + crypto_symbols +poly_symbols = stock_symbols + C.POLY_CRYPTO_SYMBOLS +alpc_symbols = set(alpc.get_ndx()[C.SYMBOL]).union(stock_symbols) timeframe = '2m' +# Double redundancy +# 1st pass + def update_poly_ohlc(): - for symbol in all_symbols: + for symbol in poly_symbols: filename = PathFinder().get_ohlc_path( symbol=symbol, provider=poly.provider) try: @@ -23,10 +25,29 @@ def update_poly_ohlc(): print(f'Polygon.io OHLC update failed for {symbol}.') print(e) finally: - if CI and os.path.exists(filename): + if C.CI and os.path.exists(filename): + os.remove(filename) + +# 2nd pass + + +def update_alpc_ohlc(): + for symbol in alpc_symbols: + filename = PathFinder().get_ohlc_path( + symbol=symbol, provider=alpc.provider) + try: + alpc.save_ohlc(symbol=symbol, timeframe=timeframe) + except Exception as e: + print(f'Alpaca OHLC update failed for {symbol}.') + print(e) + finally: + if C.CI and os.path.exists(filename): os.remove(filename) p1 = Process(target=update_poly_ohlc) +p2 = Process(target=update_alpc_ohlc) p1.start() +p2.start() p1.join() +p2.join() diff --git a/scripts/update_ohlc.py b/scripts/update_ohlc.py index 7dbb191..2cdbb11 100755 --- a/scripts/update_ohlc.py +++ b/scripts/update_ohlc.py @@ -1,23 +1,22 @@ import os -import sys from multiprocessing import Process, Value -sys.path.append('hyperdrive') -from DataSource import Polygon # noqa autopep8 -from Constants import PathFinder, POLY_CRYPTO_SYMBOLS, FEW_DAYS # noqa autopep8 -import Constants as C # noqa autopep8 +from hyperdrive.DataSource import Polygon, Alpaca +from hyperdrive.Constants import PathFinder +import hyperdrive.Constants as C counter = Value('i', 0) +alpc = Alpaca(paper=C.TEST) poly = Polygon(os.environ['POLYGON']) stock_symbols = poly.get_symbols() -crypto_symbols = POLY_CRYPTO_SYMBOLS -all_symbols = stock_symbols + crypto_symbols +poly_symbols = stock_symbols + C.POLY_CRYPTO_SYMBOLS +alpc_symbols = set(alpc.get_ndx()[C.SYMBOL]).union(stock_symbols) def update_poly_ohlc(): - for symbol in all_symbols: + for symbol in poly_symbols: try: filename = poly.save_ohlc( - symbol=symbol, timeframe=FEW_DAYS, retries=1) + symbol=symbol, timeframe=C.FEW_DAYS, retries=1) with counter.get_lock(): counter.value += 1 except Exception as e: @@ -30,9 +29,29 @@ def update_poly_ohlc(): os.remove(filename) +def update_alpc_ohlc(): + for symbol in alpc_symbols: + try: + filename = alpc.save_ohlc( + symbol=symbol, timeframe=C.FEW_DAYS, retries=1) + with counter.get_lock(): + counter.value += 1 + except Exception as e: + print(f'Alpaca OHLC update failed for {symbol}.') + print(e) + finally: + filename = PathFinder().get_ohlc_path( + symbol=symbol, provider=alpc.provider) + if C.CI and os.path.exists(filename): + os.remove(filename) + + p1 = Process(target=update_poly_ohlc) +p2 = Process(target=update_alpc_ohlc) p1.start() +p2.start() p1.join() +p2.join() -if counter.value / len(all_symbols) < 0.95: +if counter.value / (len(poly_symbols) + len(alpc_symbols)) < 0.95: exit(1) diff --git a/test/test_DataSource.py b/test/test_DataSource.py index 470810a..326433a 100755 --- a/test/test_DataSource.py +++ b/test/test_DataSource.py @@ -1,29 +1,23 @@ import os -import sys import pytest from time import sleep, time from random import choice import pandas as pd -sys.path.append('hyperdrive') -from DataSource import MarketData, Indices, Polygon, \ - LaborStats, Glassnode # noqa autopep8 -import Constants as C # noqa autopep8 -from Workflow import Flow # noqa autopep8 -from Utils import SwissArmyKnife # noqa autopep8 - - -md = MarketData() -idc = Indices() -poly = Polygon() -bls = LaborStats() -glass = Glassnode(use_cookies=True) +from hyperdrive.DataSource import MarketData, Indices, Polygon, \ + LaborStats, Glassnode, Alpaca # noqa autopep8 +import hyperdrive.Constants as C # noqa autopep8 +from hyperdrive.Workflow import Flow # noqa autopep8 +from hyperdrive.Utils import SwissArmyKnife # noqa autopep8 + + flow = Flow() knife = SwissArmyKnife() - -md = knife.use_dev(md) -poly = knife.use_dev(poly) -bls = knife.use_dev(bls) -glass = knife.use_dev(glass) +md = knife.use_dev(MarketData()) +idc = knife.use_dev(Indices()) +alpc = knife.use_dev(Alpaca(paper=True)) +poly = knife.use_dev(Polygon()) +bls = knife.use_dev(LaborStats()) +glass = knife.use_dev(Glassnode(use_cookies=True)) exp_symbols = ['AMZN', 'META', 'NFLX'] retries = 10 @@ -281,6 +275,25 @@ def test_get_ndx(self): assert (ndx[C.DELTA] == '+').all() +class TestAlpaca: + def test_init(self): + assert isinstance(alpc, Alpaca) + assert hasattr(alpc, 'base') + assert hasattr(alpc, 'token') + assert hasattr(alpc, 'secret') + assert hasattr(alpc, 'provider') + assert hasattr(alpc, 'free') + + def test_get_ohlc(self): + if not flow.is_any_workflow_running(): + df = alpc.get_ohlc(symbol='AAPL', timeframe='1m') + assert {C.TIME, C.OPEN, C.HIGH, C.LOW, + C.CLOSE, C.VOL, C.AVG}.issubset(df.columns) + assert len(df) > 10 + else: + print('Skipping Alpaca OHLC test because update in progress') + + class TestPolygon: def test_init(self): assert isinstance(poly, Polygon) @@ -336,7 +349,7 @@ def test_obey_free_limit(self): then = time() poly.log_api_call_time() - poly.obey_free_limit() + poly.obey_free_limit(C.POLY_FREE_DELAY) now = time() assert now - then > C.POLY_FREE_DELAY