Skip to content

Commit

Permalink
Merge pull request #124 from lervisnh/main
Browse files Browse the repository at this point in the history
修复多线程使用的一些问题 & 增加获取基金管理人
  • Loading branch information
Micro-sheep authored Apr 25, 2023
2 parents c166d68 + 63ac9dc commit 4b094ea
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 163 deletions.
9 changes: 7 additions & 2 deletions efinance/common/getter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from jsonpath import jsonpath
from retry import retry
from tqdm import tqdm
import time

from ..common.config import MARKET_NUMBER_DICT
from ..shared import BASE_INFO_CACHE, session
from ..shared import BASE_INFO_CACHE, session, MAX_CONNECTIONS
from ..utils import get_quote_id, to_numeric
from .config import (
EASTMONEY_BASE_INFO_FIELDS,
Expand All @@ -20,6 +21,8 @@
MagicConfig,
)

import warnings
warnings.filterwarnings("once")

@to_numeric
def get_realtime_quotes_by_fs(fs: str, **kwargs) -> pd.DataFrame:
Expand Down Expand Up @@ -99,7 +102,7 @@ def get_quote_history_single(
url = 'https://push2his.eastmoney.com/api/qt/stock/kline/get'

json_response = session.get(
url, headers=EASTMONEY_REQUEST_HEADERS, params=params
url, headers=EASTMONEY_REQUEST_HEADERS, params=params, verify=False
).json()
klines: List[str] = jsonpath(json_response, '$..klines[:]')
if not klines:
Expand Down Expand Up @@ -146,6 +149,8 @@ def start(code: str):

pbar = tqdm(total=total)
for code in codes:
if len(multitasking.get_active_tasks()) > MAX_CONNECTIONS:
time.sleep(3)
start(code)
multitasking.wait_for_tasks()
pbar.close()
Expand Down
2 changes: 2 additions & 0 deletions efinance/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@
DATA_DIR.mkdir(parents=True, exist_ok=True)
# 搜索词缓存位置
SEARCH_RESULT_CACHE_PATH = str(DATA_DIR / 'search-cache.json')

MAX_CONNECTIONS = 50
4 changes: 4 additions & 0 deletions efinance/fund/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
get_period_change,
get_public_dates,
get_quote_history,
get_quote_history_multi,
get_realtime_increase_rate,
get_types_percentage,
get_fund_manager,
)

__all__ = [
Expand All @@ -21,5 +23,7 @@
'get_public_dates',
'get_industry_distribution',
'get_quote_history',
'get_quote_history_multi',
'get_pdf_reports',
'get_fund_manager',
]
73 changes: 60 additions & 13 deletions efinance/fund/getter.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
import os
import re
import signal
from typing import List, Union
from typing import List, Union, Dict

import threading
from bs4 import BeautifulSoup
import multitasking
import pandas as pd
import requests
import rich
from jsonpath import jsonpath
from retry import retry
from tqdm import tqdm
import time

from ..utils import to_numeric
from .config import EastmoneyFundHeaders
from ..common.config import MagicConfig
from ..shared import session, MAX_CONNECTIONS
import warnings
warnings.filterwarnings("module")

signal.signal(signal.SIGINT, multitasking.killall)
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, multitasking.killall)

fund_session = session

@retry(tries=3)
@to_numeric
Expand Down Expand Up @@ -72,7 +81,7 @@ def get_quote_history(fund_code: str, pz: int = 40000) -> pd.DataFrame:
'version': '6.2.8',
}
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNHisNetList'
json_response = requests.get(url, headers=EastmoneyFundHeaders, data=data).json()
json_response = fund_session.get(url, headers=EastmoneyFundHeaders, data=data, verify=False).json()
rows = []
columns = ['日期', '单位净值', '累计净值', '涨跌幅']
if json_response is None:
Expand All @@ -94,6 +103,28 @@ def get_quote_history(fund_code: str, pz: int = 40000) -> pd.DataFrame:
df = pd.DataFrame(rows)
return df

def get_quote_history_multi(fund_codes: List[str], pz: int = 40000, **kwargs) -> Dict[str, pd.DataFrame]:
dfs: Dict[str, pd.DataFrame] = {}
pbar = tqdm(total=len(fund_codes))

@multitasking.task
@retry(tries=3, delay=1)
def start(fund_code: str):
if len(multitasking.get_active_tasks()) >= MAX_CONNECTIONS:
time.sleep(3)
_df = get_quote_history(fund_code, pz)
dfs[fund_code] = _df
pbar.update(1)
pbar.set_description_str(f'Processing => {fund_code}')

for f in fund_codes:
start(f)
multitasking.wait_for_tasks()
pbar.close()
if kwargs.get(MagicConfig.RETURN_DF):
return pd.concat(dfs, axis=0, ignore_index=True)
return dfs


@retry(tries=3)
@to_numeric
Expand Down Expand Up @@ -151,7 +182,7 @@ def get_realtime_increase_rate(fund_codes: Union[List[str], str]) -> pd.DataFram
'GSZZL': '估算涨跌幅',
}
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNFInfo'
json_response = requests.get(url, headers=EastmoneyFundHeaders, data=data).json()
json_response = fund_session.get(url, headers=EastmoneyFundHeaders, data=data).json()
rows = jsonpath(json_response, '$..Datas[:]')
if not rows:
df = pd.DataFrame(columns=columns.values())
Expand Down Expand Up @@ -230,13 +261,29 @@ def get_fund_codes(ft: str = None) -> pd.DataFrame:
params.append(('ft', ft))

url = 'http://fund.eastmoney.com/data/rankhandler.aspx'
response = requests.get(url, headers=headers, params=params)
response = fund_session.get(url, headers=headers, params=params)

columns = ['基金代码', '基金简称']
results = re.findall('"(\d{6}),(.*?),', response.text)
df = pd.DataFrame(results, columns=columns)
return df

@retry(tries=3)
def get_fund_manager(ft: str) -> pd.DataFrame:
url = f'http://fundf10.eastmoney.com/jjjl_{ft}.html'
response = fund_session.get(url)
if not response:
return pd.DataFrame()
html = response.text
soup = BeautifulSoup(html, 'html.parser')
contents = soup.find('div', class_='bs_gl').find_all('label')
start_date = contents[0].span.text
managers = ";".join([a.text for a in contents[1].find_all('a') ])
type_str = contents[2].span.text
company = contents[3].find('a').text
share = contents[4].span.text.replace('\r', '').replace('\n', '').replace(' ', '')
return pd.DataFrame( data = [[ft, start_date, company, managers, type_str, share, str(pd.to_datetime('today').date())]],
columns=["基金代码", '基金经理任职日期', '基金公司', '基金经理', '基金种类', '基金规模', '当前日期'])

@retry(tries=3)
@to_numeric
Expand Down Expand Up @@ -332,7 +379,7 @@ def get_invest_position(
if date is not None:
params.append(('DATE', date))
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNInverstPosition'
json_response = requests.get(
json_response = fund_session.get(
url, headers=EastmoneyFundHeaders, params=params
).json()
stocks = jsonpath(json_response, '$..fundStocks[:]')
Expand Down Expand Up @@ -395,7 +442,7 @@ def get_period_change(fund_code: str) -> pd.DataFrame:
('version', '6.3.6'),
)
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNPeriodIncrease'
json_response = requests.get(
json_response = fund_session.get(
url, headers=EastmoneyFundHeaders, params=params
).json()
columns = {
Expand Down Expand Up @@ -461,7 +508,7 @@ def get_public_dates(fund_code: str) -> List[str]:
('version', '6.3.8'),
)
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNIVInfoMultiple'
json_response = requests.get(
json_response = fund_session.get(
url, headers=EastmoneyFundHeaders, params=params
).json()
if json_response['Datas'] is None:
Expand Down Expand Up @@ -529,7 +576,7 @@ def get_types_percentage(
params.append(('DATE', date))
params = tuple(params)
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNAssetAllocationNew'
json_response = requests.get(
json_response = fund_session.get(
url, params=params, headers=EastmoneyFundHeaders
).json()

Expand Down Expand Up @@ -567,7 +614,7 @@ def get_base_info_single(fund_code: str) -> pd.Series:
('version', '6.3.8'),
)
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNNBasicInformation'
json_response = requests.get(
json_response = fund_session.get(
url, headers=EastmoneyFundHeaders, params=params
).json()
columns = {
Expand Down Expand Up @@ -751,7 +798,7 @@ def get_industry_distribution(
if date is not None:
params.append(('DATE', date))
url = 'https://fundmobapi.eastmoney.com/FundMNewApi/FundMNSectorAllocation'
response = requests.get(url, headers=EastmoneyFundHeaders, params=params)
response = fund_session.get(url, headers=EastmoneyFundHeaders, params=params)
datas = response.json()['Datas']

_df = pd.DataFrame(datas)
Expand Down Expand Up @@ -815,7 +862,7 @@ def download_file(
fund_code = str(fund_code)
if not os.path.exists(save_dir + '/' + fund_code):
os.mkdir(save_dir + '/' + fund_code)
response = requests.get(url, headers=headers)
response = fund_session.get(url, headers=headers)
path = f'{save_dir}/{fund_code}/{filename}{file_type}'
with open(path, 'wb') as f:
f.write(response.content)
Expand All @@ -831,7 +878,7 @@ def download_file(
('type', '3'),
)

json_response = requests.get(
json_response = fund_session.get(
'http://api.fund.eastmoney.com/f10/JJGG', headers=headers, params=params
).json()

Expand Down
15 changes: 13 additions & 2 deletions efinance/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@
import pandas as pd
import requests

from ..config import SEARCH_RESULT_CACHE_PATH
from ..config import SEARCH_RESULT_CACHE_PATH, MAX_CONNECTIONS

class CustomedSession(requests.Session):
def request(self, *args, **kwargs):
kwargs.setdefault('timeout', 180) # 3min
return super(CustomedSession, self).request(*args, **kwargs)

session = CustomedSession()
adapter = requests.adapters.HTTPAdapter(pool_connections = MAX_CONNECTIONS,
pool_maxsize = MAX_CONNECTIONS,
max_retries = 5)
session.mount('http://', adapter)
session.mount('https://', adapter)

session = requests.Session()
# 关键词搜索缓存
SEARCH_RESULT_DICT: Dict[str, dict] = dict()
# 行情ID搜索缓存
Expand Down
5 changes: 4 additions & 1 deletion efinance/stock/getter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timedelta
from typing import Dict, List, Union

import threading
import multitasking
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -35,7 +36,9 @@
EASTMONEY_STOCK_DAILY_BILL_BOARD_FIELDS,
)

signal.signal(signal.SIGINT, multitasking.killall)
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, multitasking.killall)

python_version = sys.version_info.major, sys.version_info.minor
# * 适配 pythn 3.10 及其以上版本
if python_version >= (3, 10):
Expand Down
Loading

0 comments on commit 4b094ea

Please sign in to comment.