-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlink2card.py
148 lines (112 loc) · 4.42 KB
/
link2card.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from typing import Literal, Callable
from pydantic import BaseModel
from typing import Union
from json import loads
from re import search
import contextlib
import httpx
from nonebot.adapters.onebot.v11 import MessageSegment
from .exceptions import NextMusicPlatform
from .data import MusicPlatform
QLinkSource = Literal['web', 'client']
class QLinkActionAttrs(BaseModel):
"""qq音乐分享链接解析过程参数
- source (QLinkSource): 链接来源
- pattern (str): 正则表达式
- url (str): 请求链接
- get_data (Callable): 获取数据
- get_Sid (Callable): 获取Sid
"""
source: QLinkSource
pattern: str
url: str
get_data: Callable
get_Sid: Callable
class Model:
# 构造模型
def __init__(self, query: dict, mp: MusicPlatform, path: str = str()) -> None:
self.query = query
self.mp = mp
if path:
self.path = path
async def common(self) -> MessageSegment:
"""通用解析模型"""
return MessageSegment.music(type_=self.mp.type_, id_=int(self.query[self.mp.Sid_key][0]))
async def SPqq(self) -> Union[MessageSegment, None]:
"""qq网页版&手机客户端链接解析模型"""
try:
split_path = self.path.split('/')
if (id := split_path[split_path.index('songDetail') + 1]).isnumeric():
# 纯数字id直接发送卡片
return MessageSegment.music(type_='qq', id_=int(id))
except ValueError:
try:
_QLinkActionAttrs = set_actions(self, 'client')
except KeyError as e:
raise NextMusicPlatform(self.mp.name) from e
else:
_QLinkActionAttrs = set_actions(self, 'web')
# 网页请求
async with httpx.AsyncClient() as client:
r = await client.get(_QLinkActionAttrs.url, follow_redirects=True)
contx: str = r.text
if not (match := search(_QLinkActionAttrs.pattern, contx)):
raise NextMusicPlatform(self.mp.name)
data = _QLinkActionAttrs.get_data(match)
Sid = _QLinkActionAttrs.get_Sid(data)
return MessageSegment.music(type_='qq', id_=Sid) if Sid else None
async def kg(self) -> MessageSegment:
"""酷狗音乐解析模型"""
async with httpx.AsyncClient() as client:
r = await client.get(f'http://m.kugou.com/app/i/getSongInfo.php?cmd=playInfo&hash={self.query["hash"][0]}')
contx: dict = r.json()
print(
f"{contx['url']}\n{self.query['hash'][0]}\n{contx['fileName']}\n{contx['album_img'].replace('/{size}', '')}")
return MessageSegment.music_custom(
url=f'https://www.kugou.com/song/#hash={self.query["hash"][0]}',
audio=contx['url'],
title=contx['fileName'],
content='',
img_url=contx['album_img'].replace('/{size}', '')
)
def set_actions(model: Model, source_opt: QLinkSource) -> QLinkActionAttrs:
"""设置解析过程参数
Args:
source_opt (QLinkSource): 链接来源
"""
return (QLinkActionAttrs(
source='web',
pattern=r"window.__INITIAL_DATA__\s*=\s*({.*})",
url=f'https://y.qq.com{model.path}',
get_data=lambda match: loads(
match.group(1).replace('undefined', '""')),
get_Sid=lambda data: int(
data['detail']['id']),
)
if source_opt == 'web' else
QLinkActionAttrs(
source='client',
pattern=r"window.__ssrFirstPageData__\s*=\s*({.*})<",
url=f'https://{model.mp.url_feature}{model.path}?__={model.query["__"][0]}',
get_data=lambda match: loads(
match.group(1)),
get_Sid=lambda data: int(
data['songList'][0]['id']),
))
async def handle(path: str, query: dict, mp: MusicPlatform) -> Union[MessageSegment, None]:
"""卡片处理
Args:
`path` (str): 解析路径
`query` (dict): 解析后参数
`mp` (MusicPlatform): 音乐平台
Returns:
- 音乐卡片 `MessageSegment`
- 无法生成卡片时返回 `None`
"""
with contextlib.suppress(NextMusicPlatform, ):
if (path in mp.path) and (mp.Sid_key in query):
model = Model(query, mp)
return await getattr(model, mp.model)()
elif not mp.Sid_key:
model = Model(query, mp, path)
return await getattr(model, mp.model)()