Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: format code with Autopep8 #30

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions src/plugins/ncm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ async def song_is_open(event: Union[GroupMessageEvent, PrivateMessageEvent]) ->
if isinstance(event, GroupMessageEvent):
if info := setting.search(Q["group_id"] == event.group_id):
return info[0]["song"]
setting.insert({"group_id": event.group_id, "song": False, "list": False})
setting.insert({"group_id": event.group_id,
"song": False, "list": False})
return False
elif isinstance(event, PrivateMessageEvent):
if info := setting.search(Q["user_id"] == event.user_id):
Expand All @@ -57,14 +58,16 @@ async def playlist_is_open(
if info:
return info[0]["list"]
else:
setting.insert({"group_id": event.group_id, "song": False, "list": False})
setting.insert({"group_id": event.group_id,
"song": False, "list": False})
return False
elif isinstance(event, PrivateMessageEvent):
info = setting.search(Q["user_id"] == event.user_id)
if info:
return info[0]["list"]
else:
setting.insert({"user_id": event.user_id, "song": True, "list": True})
setting.insert({"user_id": event.user_id,
"song": True, "list": True})
return True


Expand Down Expand Up @@ -95,7 +98,8 @@ async def music_reply_rule(event: Union[GroupMessageEvent, PrivateMessageEvent])
"""功能设置"""
music_regex = on_regex(r"(song|url)\?id=([0-9]+)(|&)", priority=2, block=False)
"""歌曲id识别"""
playlist_regex = on_regex(r"playlist\?id=([0-9]+)(|&)", priority=2, block=False)
playlist_regex = on_regex(
r"playlist\?id=([0-9]+)(|&)", priority=2, block=False)
"""歌单识别"""
music_reply = on_message(priority=2, rule=Rule(music_reply_rule), block=False)
"""回复下载"""
Expand All @@ -117,7 +121,8 @@ async def receive_song(
):
_id = await nncm.search_song(keyword=song.extract_plain_text(), limit=1)
message_id = await bot.send(
event=event, message=Message(MessageSegment.music(type_="163", id_=_id))
event=event, message=Message(
MessageSegment.music(type_="163", id_=_id))
)
nncm.get_song(message_id=message_id["message_id"], nid=_id)
# try:
Expand Down Expand Up @@ -185,28 +190,34 @@ async def set_receive(
if mold in TRUE:
info[0]["song"] = True
info[0]["list"] = True
setting.update(info[0], Q["group_id"] == event.group_id)
setting.update(
info[0], Q["group_id"] == event.group_id)
msg = "已开启自动下载功能"
await bot.send(
event=event, message=Message(MessageSegment.text(msg))
event=event, message=Message(
MessageSegment.text(msg))
)
elif mold in FALSE:
info[0]["song"] = False
info[0]["list"] = False
setting.update(info[0], Q["group_id"] == event.group_id)
setting.update(
info[0], Q["group_id"] == event.group_id)
msg = "已关闭自动下载功能"
await bot.send(
event=event, message=Message(MessageSegment.text(msg))
event=event, message=Message(
MessageSegment.text(msg))
)
logger.debug(f"用户<{event.sender.nickname}>执行操作成功")
else:
if mold in TRUE:
setting.insert(
{"group_id": event.group_id, "song": True, "list": True}
{"group_id": event.group_id,
"song": True, "list": True}
)
elif mold in FALSE:
setting.insert(
{"group_id": event.group_id, "song": False, "list": False}
{"group_id": event.group_id,
"song": False, "list": False}
)
elif isinstance(event, PrivateMessageEvent):
info = setting.search(Q["user_id"] == event.user_id)
Expand All @@ -218,15 +229,17 @@ async def set_receive(
setting.update(info[0], Q["user_id"] == event.user_id)
msg = "已开启下载功能"
await bot.send(
event=event, message=Message(MessageSegment.text(msg))
event=event, message=Message(
MessageSegment.text(msg))
)
elif mold in FALSE:
info[0]["song"] = False
info[0]["list"] = False
setting.update(info[0], Q["user_id"] == event.user_id)
msg = "已关闭下载功能"
await bot.send(
event=event, message=Message(MessageSegment.text(msg))
event=event, message=Message(
MessageSegment.text(msg))
)
logger.debug(f"用户<{event.sender.nickname}>执行操作成功")
else:
Expand Down
38 changes: 25 additions & 13 deletions src/plugins/ncm/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
cmd = list(nonebot.get_driver().config.command_start)[0]


class NcmLoginFailedException(Exception): pass
class NcmLoginFailedException(Exception):
pass


# ============主类=============
Expand All @@ -61,7 +62,8 @@ def load_user(session: str):

def login(self) -> bool:
try:
self.api.login.LoginViaCellphone(phone=ncm_config.ncm_phone, password=ncm_config.ncm_password)
self.api.login.LoginViaCellphone(
phone=ncm_config.ncm_phone, password=ncm_config.ncm_password)
self.get_user_info()
return True
except Exception as e:
Expand All @@ -74,25 +76,28 @@ def login(self) -> bool:
return False

def get_user_info(self) -> str:
message: str = f"欢迎您网易云用户:{GetCurrentSession().nickname} [{GetCurrentSession().uid}]";
message: str = f"欢迎您网易云用户:{GetCurrentSession().nickname} [{GetCurrentSession().uid}]"
logger.success(message)
self.save_user(DumpSessionAsString(GetCurrentSession()))
return message

def get_phone_login(self):
phone = ncm_config.ncm_phone
ctcode = int(ncm_config.ncm_ctcode)
result = self.api.login.SetSendRegisterVerifcationCodeViaCellphone(cell=phone, ctcode=ctcode)
result = self.api.login.SetSendRegisterVerifcationCodeViaCellphone(
cell=phone, ctcode=ctcode)
if not result.get('code', 0) == 200:
logger.error(result)
else:
logger.success('已发送验证码,输入验证码:')
while True:
captcha = int(input())
verified = self.api.login.GetRegisterVerifcationStatusViaCellphone(phone, captcha, ctcode)
verified = self.api.login.GetRegisterVerifcationStatusViaCellphone(
phone, captcha, ctcode)
if verified.get('code', 0) == 200:
break
result = self.api.login.LoginViaCellphone(phone, captcha=captcha, ctcode=ctcode)
result = self.api.login.LoginViaCellphone(
phone, captcha=captcha, ctcode=ctcode)
self.get_user_info()

def get_qrcode(self):
Expand Down Expand Up @@ -122,12 +127,14 @@ def get_qrcode(self):

def detail_names(self, ids: List[int]) -> List[str]:
songs: list = self.api.track.GetTrackDetail(song_ids=ids)["songs"]
detail = [(data["name"] + "-" + ",".join([names["name"] for names in data["ar"]])) for data in songs]
detail = [(data["name"] + "-" + ",".join([names["name"]
for names in data["ar"]])) for data in songs]
return detail

@logger.catch()
def get_detail(self, ids: List[int]):
data: list = self.api.track.GetTrackAudio(song_ids=ids, bitrate=ncm_config.ncm_bitrate * 1000)["data"]
data: list = self.api.track.GetTrackAudio(
song_ids=ids, bitrate=ncm_config.ncm_bitrate * 1000)["data"]
names: list = self.detail_names(ids)
for i in range(len(ids)):
data[i]['ncm_name'] = names[i]
Expand All @@ -143,7 +150,8 @@ async def music_check(self, nid: Union[int, List[int]], event: Union[GroupMessag
info = music.search(Q["id"] == i)
if info:
try:
tasks.append(asyncio.create_task(self.upload_data_file(event=event, data=info[0])))
tasks.append(asyncio.create_task(
self.upload_data_file(event=event, data=info[0])))
del_nid.append(i)
except Exception:
continue
Expand All @@ -154,7 +162,8 @@ async def music_check(self, nid: Union[int, List[int]], event: Union[GroupMessag
info = music.search(Q["id"] == nid)
if info:
try:
tasks.append(asyncio.create_task(self.upload_data_file(event=event, data=info[0])))
tasks.append(asyncio.create_task(
self.upload_data_file(event=event, data=info[0])))
return
except Exception as e:
if isinstance(e, ActionFailed) and e.info.get("retcode") != 10003:
Expand All @@ -168,7 +177,8 @@ async def music_check(self, nid: Union[int, List[int]], event: Union[GroupMessag
await self.start_upload(ids=nid, event=event)

async def search_song(self, keyword: str, limit: int = 1) -> int: # 搜索歌曲
res = self.api.cloudsearch.GetSearchResult(keyword=keyword, stype=SONG, limit=limit)
res = self.api.cloudsearch.GetSearchResult(
keyword=keyword, stype=SONG, limit=limit)
logger.debug(f"搜索歌曲{keyword},返回结果:{res}")
if "result" in res.keys():
data = res["result"]["songs"]
Expand All @@ -178,10 +188,12 @@ async def search_song(self, keyword: str, limit: int = 1) -> int: # 搜索歌
return data[0]["id"]

async def search_user(self, keyword: str, limit: int = 1): # 搜索用户
self.api.cloudsearch.GetSearchResult(keyword=keyword, stype=USER, limit=limit)
self.api.cloudsearch.GetSearchResult(
keyword=keyword, stype=USER, limit=limit)

async def search_playlist(self, keyword: str, limit: int = 1): # 搜索歌单
self.api.cloudsearch.GetSearchResult(keyword=keyword, stype=PLAYLIST, limit=limit)
self.api.cloudsearch.GetSearchResult(
keyword=keyword, stype=PLAYLIST, limit=limit)

@staticmethod
def check_message(message_id: int):
Expand Down