From 77b6c8b1e67ec60f91b146ef987052452e5f10b0 Mon Sep 17 00:00:00 2001 From: Omers5 <31316779+Omers5@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:46:28 -0400 Subject: [PATCH 1/2] Update processor_alpaca.py Update processor_alpaca to use the new Alpaca-Py instead of the deprecated alpaca-trade-api --- .../meta/data_processors/processor_alpaca.py | 93 +++++++++++++++---- 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/finrl/meta/data_processors/processor_alpaca.py b/finrl/meta/data_processors/processor_alpaca.py index 200a7f447..554f57bee 100644 --- a/finrl/meta/data_processors/processor_alpaca.py +++ b/finrl/meta/data_processors/processor_alpaca.py @@ -1,34 +1,36 @@ -from __future__ import annotations - from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor -import alpaca_trade_api as tradeapi +#import alpaca_trade_api as tradeapi +from alpaca.data.historical import StockHistoricalDataClient +from alpaca.data.requests import StockBarsRequest +from alpaca.data.timeframe import TimeFrame + import exchange_calendars as tc import numpy as np import pandas as pd import pytz from stockstats import StockDataFrame as Sdf - +from datetime import timedelta as td +from datetime import datetime class AlpacaProcessor: - def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, api=None): - if api is None: + def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, client=None): + if client is None: try: - self.api = tradeapi.REST(API_KEY, API_SECRET, API_BASE_URL, "v2") + self.client = StockHistoricalDataClient(API_KEY, API_SECRET) except BaseException: raise ValueError("Wrong Account Info!") else: - self.api = api + self.client = client def _fetch_data_for_ticker(self, ticker, start_date, end_date, time_interval): - bars = self.api.get_bars( - ticker, - time_interval, - start=start_date.isoformat(), - end=end_date.isoformat(), - ).df - bars["symbol"] = ticker + request_params = StockBarsRequest(symbol_or_symbols=ticker, + timeframe = TimeFrame.Minute, + start = start_date, + end = end_date) + bars = self.client.get_stock_bars(request_params).df + return bars def download_data( @@ -53,7 +55,7 @@ def download_data( NY = "America/New_York" start_date = pd.Timestamp(start_date + " 09:30:00", tz=NY) end_date = pd.Timestamp(end_date + " 15:59:00", tz=NY) - + data_list = [] # Use ThreadPoolExecutor to fetch data for multiple tickers concurrently with ThreadPoolExecutor(max_workers=10) as executor: futures = [ @@ -66,11 +68,37 @@ def download_data( ) for ticker in ticker_list ] - data_list = [future.result() for future in futures] + for future in futures: + + bars = future.result() + #fix start + # Reorganize the dataframes to be in original alpaca_trade_api structure + # Rename the existing 'symbol' column if it exists + if not bars.empty: + + # Now reset the index + bars.reset_index(inplace=True) + + # Set 'timestamp' as the new index + if 'level_1' in bars.columns: + bars.rename(columns={'level_1': 'timestamp'}, inplace=True) + if 'level_0' in bars.columns: + bars.rename(columns={'level_0': 'symbol'}, inplace=True) + + bars.set_index('timestamp', inplace=True) + + # Reorder and rename columns as needed + bars = bars[['close', 'high', 'low', 'trade_count', 'open', 'volume', 'vwap', 'symbol']] + + + data_list.append(bars) + else: + print("empty") + # Combine the data data_df = pd.concat(data_list, axis=0) - + # Convert the timezone data_df = data_df.tz_convert(NY) @@ -371,7 +399,29 @@ def fetch_latest_data( ) -> pd.DataFrame: data_df = pd.DataFrame() for tic in ticker_list: - barset = self.api.get_bars([tic], time_interval, limit=limit).df # [tic] + request_params = StockBarsRequest(symbol_or_symbols=[tic], + timeframe = TimeFrame.Minute, + limit = limit) + + barset = self.client.get_stock_bars(request_params).df + # Reorganize the dataframes to be in original alpaca_trade_api structure + # Rename the existing 'symbol' column if it exists + if 'symbol' in barset.columns: + barset.rename(columns={'symbol': 'symbol_old'}, inplace=True) + + # Now reset the index + barset.reset_index(inplace=True) + + # Set 'timestamp' as the new index + if 'level_0' in barset.columns: + barset.rename(columns={'level_0': 'symbol'}, inplace=True) + if 'level_1' in bars.columns: + barset.rename(columns={'level_1': 'timestamp'}, inplace=True) + barset.set_index('timestamp', inplace=True) + + # Reorder and rename columns as needed + barset = bars[['close', 'high', 'low', 'trade_count', 'open', 'volume', 'vwap', 'symbol']] + barset["tic"] = tic barset = barset.reset_index() data_df = pd.concat([data_df, barset]) @@ -451,6 +501,9 @@ def fetch_latest_data( ) latest_price = price_array[-1] latest_tech = tech_array[-1] - turb_df = self.api.get_bars(["VIXY"], time_interval, limit=1).df + request_params = StockBarsRequest(symbol_or_symbols="VIXY", + timeframe = TimeFrame.Minute, + limit = 1) + turb_df = self.client.get_stock_bars(request_params).df latest_turb = turb_df["close"].values return latest_price, latest_tech, latest_turb From 4e3943e30791a7c72da3dcaf718c147786cd00c7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 14 Mar 2024 18:56:07 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../meta/data_processors/processor_alpaca.py | 117 +++++++++++------- requirements.txt | 2 +- 2 files changed, 72 insertions(+), 47 deletions(-) diff --git a/finrl/meta/data_processors/processor_alpaca.py b/finrl/meta/data_processors/processor_alpaca.py index 554f57bee..80e0a2f0b 100644 --- a/finrl/meta/data_processors/processor_alpaca.py +++ b/finrl/meta/data_processors/processor_alpaca.py @@ -1,18 +1,21 @@ +from __future__ import annotations + from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor - -#import alpaca_trade_api as tradeapi -from alpaca.data.historical import StockHistoricalDataClient -from alpaca.data.requests import StockBarsRequest -from alpaca.data.timeframe import TimeFrame +from datetime import datetime +from datetime import timedelta as td import exchange_calendars as tc import numpy as np import pandas as pd import pytz +from alpaca.data.historical import StockHistoricalDataClient +from alpaca.data.requests import StockBarsRequest +from alpaca.data.timeframe import TimeFrame from stockstats import StockDataFrame as Sdf -from datetime import timedelta as td -from datetime import datetime + +# import alpaca_trade_api as tradeapi + class AlpacaProcessor: def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, client=None): @@ -25,12 +28,14 @@ def __init__(self, API_KEY=None, API_SECRET=None, API_BASE_URL=None, client=None self.client = client def _fetch_data_for_ticker(self, ticker, start_date, end_date, time_interval): - request_params = StockBarsRequest(symbol_or_symbols=ticker, - timeframe = TimeFrame.Minute, - start = start_date, - end = end_date) + request_params = StockBarsRequest( + symbol_or_symbols=ticker, + timeframe=TimeFrame.Minute, + start=start_date, + end=end_date, + ) bars = self.client.get_stock_bars(request_params).df - + return bars def download_data( @@ -69,36 +74,45 @@ def download_data( for ticker in ticker_list ] for future in futures: - + bars = future.result() - #fix start + # fix start # Reorganize the dataframes to be in original alpaca_trade_api structure # Rename the existing 'symbol' column if it exists if not bars.empty: - + # Now reset the index bars.reset_index(inplace=True) - + # Set 'timestamp' as the new index - if 'level_1' in bars.columns: - bars.rename(columns={'level_1': 'timestamp'}, inplace=True) - if 'level_0' in bars.columns: - bars.rename(columns={'level_0': 'symbol'}, inplace=True) + if "level_1" in bars.columns: + bars.rename(columns={"level_1": "timestamp"}, inplace=True) + if "level_0" in bars.columns: + bars.rename(columns={"level_0": "symbol"}, inplace=True) + + bars.set_index("timestamp", inplace=True) - bars.set_index('timestamp', inplace=True) - # Reorder and rename columns as needed - bars = bars[['close', 'high', 'low', 'trade_count', 'open', 'volume', 'vwap', 'symbol']] - - + bars = bars[ + [ + "close", + "high", + "low", + "trade_count", + "open", + "volume", + "vwap", + "symbol", + ] + ] + data_list.append(bars) else: print("empty") - # Combine the data data_df = pd.concat(data_list, axis=0) - + # Convert the timezone data_df = data_df.tz_convert(NY) @@ -399,29 +413,40 @@ def fetch_latest_data( ) -> pd.DataFrame: data_df = pd.DataFrame() for tic in ticker_list: - request_params = StockBarsRequest(symbol_or_symbols=[tic], - timeframe = TimeFrame.Minute, - limit = limit) - + request_params = StockBarsRequest( + symbol_or_symbols=[tic], timeframe=TimeFrame.Minute, limit=limit + ) + barset = self.client.get_stock_bars(request_params).df # Reorganize the dataframes to be in original alpaca_trade_api structure # Rename the existing 'symbol' column if it exists - if 'symbol' in barset.columns: - barset.rename(columns={'symbol': 'symbol_old'}, inplace=True) - + if "symbol" in barset.columns: + barset.rename(columns={"symbol": "symbol_old"}, inplace=True) + # Now reset the index barset.reset_index(inplace=True) - + # Set 'timestamp' as the new index - if 'level_0' in barset.columns: - barset.rename(columns={'level_0': 'symbol'}, inplace=True) - if 'level_1' in bars.columns: - barset.rename(columns={'level_1': 'timestamp'}, inplace=True) - barset.set_index('timestamp', inplace=True) - + if "level_0" in barset.columns: + barset.rename(columns={"level_0": "symbol"}, inplace=True) + if "level_1" in bars.columns: + barset.rename(columns={"level_1": "timestamp"}, inplace=True) + barset.set_index("timestamp", inplace=True) + # Reorder and rename columns as needed - barset = bars[['close', 'high', 'low', 'trade_count', 'open', 'volume', 'vwap', 'symbol']] - + barset = bars[ + [ + "close", + "high", + "low", + "trade_count", + "open", + "volume", + "vwap", + "symbol", + ] + ] + barset["tic"] = tic barset = barset.reset_index() data_df = pd.concat([data_df, barset]) @@ -501,9 +526,9 @@ def fetch_latest_data( ) latest_price = price_array[-1] latest_tech = tech_array[-1] - request_params = StockBarsRequest(symbol_or_symbols="VIXY", - timeframe = TimeFrame.Minute, - limit = 1) + request_params = StockBarsRequest( + symbol_or_symbols="VIXY", timeframe=TimeFrame.Minute, limit=1 + ) turb_df = self.client.get_stock_bars(request_params).df latest_turb = turb_df["close"].values return latest_price, latest_tech, latest_turb diff --git a/requirements.txt b/requirements.txt index 78291dd7c..2c62a4345 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,7 +43,7 @@ swig tensorboardX wheel>=0.33.6 +wrds # market data & paper trading API yfinance -wrds